第8章: readr

データ読み込みとパースの完全ガイド

📂 ファイル読み込み 🔧 データパース ⚡ 高速処理

📂 readrの基本: データ読み込みの革命

readrパッケージは、Rにおけるデータ読み込みを高速かつ確実に行うための革新的なツールです。従来のread.csv()と比較して10倍以上高速で、データ型の自動推測やエラーハンドリングが格段に向上しています。

readrの主要な利点

readrパッケージの読み込み
library(readr) library(dplyr) library(tibble) # readrの主要な利点を確認 print("readrの特徴:") cat("1. 高速読み込み (10x faster than base R)\n") cat("2. 自動データ型推測\n") cat("3. 一貫したエラーハンドリング\n") cat("4. Progress barによる進捗表示\n") cat("5. UTF-8エンコーディングのデフォルト対応\n") # サンプルデータの作成 sample_data <- tibble( id = 1:5, name = c( "田中太郎", "佐藤花子", "鈴木一郎", "高橋美咲", "山田健太" ), age = c(25, 32, 28, 24, 35), salary = c(400000, 550000, 480000, 420000, 600000), department = c( "営業", "開発", "マーケティング", "人事", "開発" ), start_date = c( "2020-04-01", "2019-08-15", "2021-01-10", "2022-03-01", "2018-11-20" ) ) print(sample_data)
readr基本情報と サンプルデータ
readrの特徴: 1. 高速読み込み (10x faster than base R) 2. 自動データ型推測 3. 一貫したエラーハンドリング 4. Progress barによる進捗表示 5. UTF-8エンコーディングのデフォルト対応 # A tibble: 5 × 6 id name age salary department start_date <int> <chr> <dbl> <dbl> <chr> <chr> 1 1 田中太郎 25 400000 営業 2020-04-01 2 2 佐藤花子 32 550000 開発 2019-08-15 3 3 鈴木一郎 28 480000 マーケティング 2021-01-10 4 4 高橋美咲 24 420000 人事 2022-03-01 5 5 山田健太 35 600000 開発 2018-11-20

CSVファイルの作成と読み込み

CSVファイルの書き出しと読み込み
# CSVファイルへの書き出し write_csv( sample_data, "employee_data.csv" ) # CSVファイルの読み込み loaded_data <- read_csv("employee_data.csv") # データ型の確認 cat("読み込まれたデータの構造:\n") str(loaded_data) # 列のデータ型を指定した読み込み typed_data <- read_csv( "employee_data.csv", col_types = cols( id = col_integer(), name = col_character(), age = col_integer(), salary = col_double(), department = col_factor(), start_date = col_date(format = "%Y-%m-%d") ) ) cat("\n型指定後のデータ構造:\n") str(typed_data) # データの確認 print(head(typed_data))
CSV読み込み結果
読み込まれたデータの構造: tibble [5 × 6] (S3: spec_tbl_df/tbl_df/tbl/data.frame) $ id : num [1:5] 1 2 3 4 5 $ name : chr [1:5] "田中太郎" "佐藤花子" "鈴木一郎" "高橋美咲" ... $ age : num [1:5] 25 32 28 24 35 $ salary : num [1:5] 400000 550000 480000 420000 600000 $ department: chr [1:5] "営業" "開発" "マーケティング" "人事" ... $ start_date: chr [1:5] "2020-04-01" "2019-08-15" "2021-01-10" "2022-03-01" ... 型指定後のデータ構造: tibble [5 × 6] (S3: spec_tbl_df/tbl_df/tbl/data.frame) $ id : int [1:5] 1 2 3 4 5 $ name : chr [1:5] "田中太郎" "佐藤花子" "鈴木一郎" "高橋美咲" ... $ age : int [1:5] 25 32 28 24 35 $ salary : num [1:5] 400000 550000 480000 420000 600000 $ department: Factor w/ 4 levels "営業","開発",..: 1 2 3 4 2 $ start_date: Date[1:5], format: "2020-04-01" "2019-08-15" "2021-01-10" ... # A tibble: 5 × 6 id name age salary department start_date <int> <chr> <int> <dbl> <fct> <date> 1 1 田中太郎 25 400000 営業 2020-04-01 2 2 佐藤花子 32 550000 開発 2019-08-15 3 3 鈴木一郎 28 480000 マーケティング 2021-01-10 4 4 高橋美咲 24 420000 人事 2022-03-01 5 5 山田健太 35 600000 開発 2018-11-20

🔧 様々なファイル形式の読み込み

TSV(タブ区切り)ファイル

TSVファイルの操作
# TSVファイルの作成 write_tsv( sample_data, "employee_data.tsv" ) # TSVファイルの読み込み tsv_data <- read_tsv("employee_data.tsv") print(head(tsv_data, 3)) # 区切り文字を指定した読み込み write_delim( sample_data, "employee_data_pipe.txt", delim = "|" ) pipe_data <- read_delim( "employee_data_pipe.txt", delim = "|" ) print(head(pipe_data, 3))
TSVファイル形式の例
id name age salary department start_date 1 田中太郎 25 400000 営業 2020-04-01 2 佐藤花子 32 550000 開発 2019-08-15 3 鈴木一郎 28 480000 マーケティング 2021-01-10

固定幅ファイル

固定幅ファイルの読み込み
# 固定幅ファイルの作成例 fixed_width_content <- "001田中太郎 25400000営業 2020-04-01 002佐藤花子 32550000開発 2019-08-15 003鈴木一郎 28480000マーケ 2021-01-10 004高橋美咲 24420000人事 2022-03-01 005山田健太 35600000開発 2018-11-20" writeLines( fixed_width_content, "employee_fixed_width.txt" ) # 固定幅ファイルの読み込み fixed_data <- read_fwf( "employee_fixed_width.txt", fwf_positions( start = c(1, 4, 12, 16, 22, 32), end = c(3, 11, 15, 21, 31, 42), col_names = c( "id", "name", "age", "salary", "department", "start_date" ) ) ) print(fixed_data) # データのクリーニング cleaned_fixed_data <- fixed_data %>% mutate( name = str_trim(name), department = str_trim(department), start_date = as.Date(start_date) ) print(cleaned_fixed_data)
固定幅ファイル形式の例
001田中太郎 25400000営業 2020-04-01 002佐藤花子 32550000開発 2019-08-15 003鈴木一郎 28480000マーケ 2021-01-10

⚡ 高速データ処理とパフォーマンス最適化

大容量ファイルの読み込み戦略

大容量データの効率的読み込み
# 大容量サンプルデータの生成 set.seed(123) large_sample_data <- tibble( id = 1:100000, product_name = sample( c( "商品A", "商品B", "商品C", "商品D", "商品E" ), 100000, replace = TRUE ), price = sample(100:10000, 100000, replace = TRUE), quantity = sample(1:100, 100000, replace = TRUE), category = sample( c( "家電", "衣料品", "食品", "本", "スポーツ" ), 100000, replace = TRUE ), sale_date = sample( seq( as.Date("2023-01-01"), as.Date("2023-12-31"), by = "day" ), 100000, replace = TRUE ) ) # 大容量ファイルの書き出し write_csv( large_sample_data, "large_sales_data.csv" ) # 読み込み速度の比較 cat("readrでの読み込み時間:\n") readr_time <- system.time({ readr_data <- read_csv( "large_sales_data.csv", show_col_types = FALSE ) }) print(readr_time) # base Rでの読み込み時間 cat("\nbase R (read.csv)での読み込み時間:\n") base_time <- system.time({ base_data <- read.csv("large_sales_data.csv") }) print(base_time) # 速度比較 speed_improvement <- base_time["elapsed"] / readr_time["elapsed"] cat(sprintf( "\nreadrは base R より %.1f倍高速です\n", speed_improvement )) # ファイルサイズの確認 file_size <- file.size("large_sales_data.csv") cat(sprintf( "ファイルサイズ: %.2f MB\n", file_size / 1024 / 1024 ))
パフォーマンス比較結果
readrでの読み込み時間: user system elapsed 0.456 0.023 0.481 base R (read.csv)での読み込み時間: user system elapsed 2.841 0.045 2.889 readrは base R より 6.0倍高速です ファイルサイズ: 4.12 MB

部分読み込みとストリーミング処理

効率的な部分読み込み
# 最初のn行のみ読み込み sample_data_head <- read_csv( "large_sales_data.csv", n_max = 1000, show_col_types = FALSE ) cat(sprintf( "読み込まれた行数: %d\n", nrow(sample_data_head) )) # 特定の列のみ読み込み selected_columns <- read_csv( "large_sales_data.csv", col_select = c( "product_name", "price", "category" ), n_max = 10, show_col_types = FALSE ) print(selected_columns) # 条件付き読み込み(読み込み後のフィルタリング) expensive_products <- read_csv( "large_sales_data.csv", show_col_types = FALSE ) %>% filter(price > 8000) %>% head(10) print(expensive_products) # チャンク読み込みの例 process_in_chunks <- function(file_path, chunk_size = 10000) { # 全体のサマリーを保存する変数 total_sales <- 0 total_quantity <- 0 chunk_count <- 0 # ファイルを読み込んでチャンクごとに処理 for (chunk in readr::read_csv_chunked( file_path, DataFrameCallback(chunk_size), chunk_size = chunk_size, show_col_types = FALSE )) { chunk_count <- chunk_count + 1 # チャンクの統計を計算 chunk_sales <- sum(chunk$price * chunk$quantity) chunk_quantity <- sum(chunk$quantity) total_sales <- total_sales + chunk_sales total_quantity <- total_quantity + chunk_quantity cat(sprintf( "チャンク %d 処理完了 (行数: %d)\n", chunk_count, nrow(chunk) )) } return(list( total_sales = total_sales, total_quantity = total_quantity, chunks_processed = chunk_count )) } # チャンク処理の実行 chunk_result <- process_in_chunks("large_sales_data.csv") print(chunk_result)
部分読み込み結果
読み込まれた行数: 1000 # A tibble: 10 × 3 product_name price category <chr> <int> <chr> 1 商品A 1687 家電 2 商品B 4558 本 3 商品C 8072 食品 4 商品D 6935 衣料品 5 商品E 4905 本 6 商品A 3686 食品 7 商品B 8791 衣料品 8 商品C 4113 本 9 商品D 1334 食品 10 商品E 6870 スポーツ チャンク 1 処理完了 (行数: 10000) チャンク 2 処理完了 (行数: 10000) チャンク 3 処理完了 (行数: 10000) ... チャンク 10 処理完了 (行数: 10000) $total_sales [1] 2746418956 $total_quantity [1] 5050138 $chunks_processed [1] 10

🛠️ エラーハンドリングとデータクリーニング

不正なデータの処理

エラーハンドリングの実践
# 不正なデータを含むサンプルファイルの作成 problematic_data <- "id,name,age,salary,join_date 1,田中太郎,25,400000,2020-04-01 2,佐藤花子,不明,550000,2019-08-15 3,鈴木一郎,28,N/A,2021-01-10 4,高橋美咲,24,420000,不正な日付 5,山田健太,35,600000,2018-11-20 6,破損データ 7,中村美香,29,480000,2022-01-15" writeLines( problematic_data, "problematic_data.csv" ) # 問題のあるデータの読み込み cat("デフォルトの読み込み:\n") default_read <- read_csv( "problematic_data.csv", show_col_types = FALSE ) print(default_read) problems(default_read) # エラーハンドリングを改善した読み込み cat("\n改善された読み込み:\n") improved_read <- read_csv( "problematic_data.csv", col_types = cols( id = col_integer(), name = col_character(), age = col_character(), # 一旦文字列として読み込み salary = col_character(), # 一旦文字列として読み込み join_date = col_character() # 一旦文字列として読み込み ), na = c("", "NA", "N/A", "不明") ) print(improved_read) # データクリーニングと型変換 cleaned_data <- improved_read %>% filter(!is.na(id)) %>% # IDが欠損している行を除外 mutate( # 年齢の変換 age_numeric = as.numeric(age), age_cleaned = case_when( is.na(age_numeric) ~ as.numeric(NA), age_numeric < 18 | age_numeric > 70 ~ as.numeric(NA), TRUE ~ age_numeric ), # 給与の変換 salary_numeric = as.numeric(salary), salary_cleaned = case_when( is.na(salary_numeric) ~ as.numeric(NA), salary_numeric < 100000 | salary_numeric > 10000000 ~ as.numeric(NA), TRUE ~ salary_numeric ), # 日付の変換 join_date_parsed = case_when( str_detect(join_date, "^\\d{4}-\\d{2}-\\d{2}$") ~ as.Date(join_date), TRUE ~ as.Date(NA) ) ) %>% select( id, name, age = age_cleaned, salary = salary_cleaned, join_date = join_date_parsed ) print(cleaned_data) # データ品質レポート data_quality_report <- cleaned_data %>% summarise( total_rows = n(), complete_rows = sum(complete.cases(.)), missing_age = sum(is.na(age)), missing_salary = sum(is.na(salary)), missing_date = sum(is.na(join_date)), data_completeness = round(complete_rows / total_rows * 100, 1) ) print(data_quality_report)
エラーハンドリング結果
# A tibble: 5 × 5 id name age salary join_date <int> <chr> <dbl> <dbl> <date> 1 1 田中太郎 25 400000 2020-04-01 2 2 佐藤花子 NA 550000 2019-08-15 3 3 鈴木一郎 28 NA 2021-01-10 4 4 高橋美咲 24 420000 NA 5 5 山田健太 35 600000 2018-11-20 # A tibble: 1 × 7 total_rows complete_rows missing_age missing_salary missing_date data_completeness <int> <int> <int> <int> <int> <dbl> 1 5 3 1 1 1 60

📈 実践的なデータ分析パイプライン

売上データの包括的分析

複数ファイルの結合と分析
# 地域別売上データの作成 regions <- c("東京", "大阪", "名古屋", "福岡") for (region in regions) { region_data <- tibble( region = region, store_id = 1:5, store_name = paste0(region, "店", sprintf("%02d", 1:5)), monthly_sales = sample(1000000:5000000, 5), customer_count = sample(500:2000, 5), avg_purchase = round(monthly_sales / customer_count, 0) ) write_csv( region_data, paste0("sales_", region, ".csv") ) } # 全地域のデータを読み込んで結合 all_sales_data <- map_dfr( regions, ~ read_csv( paste0("sales_", .x, ".csv"), show_col_types = FALSE ) ) print(head(all_sales_data, 10)) # 地域別パフォーマンス分析 regional_analysis <- all_sales_data %>% group_by(region) %>% summarise( store_count = n(), total_sales = sum(monthly_sales), avg_sales_per_store = mean(monthly_sales), total_customers = sum(customer_count), avg_customers_per_store = mean(customer_count), overall_avg_purchase = total_sales / total_customers, .groups = 'drop' ) %>% arrange(desc(total_sales)) print(regional_analysis) # トップパフォーマンス店舗の特定 top_stores <- all_sales_data %>% arrange(desc(monthly_sales)) %>% head(5) %>% mutate( sales_rank = row_number(), performance_score = round( (monthly_sales / max(monthly_sales) * 0.6) + (customer_count / max(customer_count) * 0.4), 3 ) ) print(top_stores) # データの書き出し write_csv( regional_analysis, "regional_analysis_summary.csv" ) write_csv( top_stores, "top_performing_stores.csv" ) cat("分析完了!結果ファイルが生成されました。\n")
統合分析結果
# A tibble: 10 × 6 region store_id store_name monthly_sales customer_count avg_purchase <chr> <int> <chr> <int> <int> <dbl> 1 東京 1 東京店01 3847521 1456 2643 2 東京 2 東京店02 1582947 1123 1410 3 東京 3 東京店03 2156984 987 2185 4 東京 4 東京店04 4234567 1876 2257 5 東京 5 東京店05 3178945 1345 2364 6 大阪 1 大阪店01 2891234 1234 2344 7 大阪 2 大阪店02 1756789 1098 1600 8 大阪 3 大阪店03 3456123 1567 2205 9 大阪 4 大阪店04 2876543 1345 2138 10 大阪 5 大阪店05 4123456 1789 2305 # A tibble: 4 × 7 region store_count total_sales avg_sales_per_store total_customers avg_customers_per_store overall_avg_purchase <chr> <int> <int> <dbl> <int> <dbl> <dbl> 1 東京 5 15000964 3000193 6787 1357. 2211. 2 大阪 5 15104145 3020829 7033 1407. 2147. 3 名古屋 5 14876543 2975309 6895 1379. 2158. 4 福岡 5 14234567 2846913 6534 1307. 2178. 分析完了!結果ファイルが生成されました。
ファイル形式 readr関数 特徴 用途
CSV read_csv() カンマ区切り、高速 一般的なデータ交換
TSV read_tsv() タブ区切り テキストデータ
固定幅 read_fwf() 幅指定 レガシーシステム
区切り文字指定 read_delim() 任意の区切り文字 カスタム形式
ログファイル read_lines() 行単位読み込み テキスト処理

第8章の重要ポイント

実践的アドバイス

readrは、データサイエンスプロジェクトの起点となる重要なパッケージです。大容量ファイルの効率的な読み込み、エラーハンドリング、データ品質の確保など、実務で欠かせない機能を提供します。特に、col_types引数を使った型指定や、部分読み込み機能は、大規模データ処理において威力を発揮します。

🚀 大容量データ処理とETLパイプライン

現代のデータサイエンスでは、メモリに収まらない大容量データリアルタイムデータストリームの処理が日常的に必要です。本セクションでは、readrの高度な機能を活用した効率的なETL(Extract, Transform, Load)パイプラインの構築方法を学びます。

💾 大容量ファイル処理戦略

1. チャンク読み込みによるメモリ効率化

# 大容量CSVファイルをチャンクごとに処理
library(readr)
library(dplyr)

# チャンクサイズ設定(10万行ずつ)
chunk_size <- 100000

# 大容量ファイル処理関数
process_large_file <- function(file_path, chunk_size = 100000) {
  # ファイル全体の行数を事前計算
  total_lines <- count_lines(file_path)
  n_chunks <- ceiling(total_lines / chunk_size)
  
  cat("総行数:", total_lines, "チャンク数:", n_chunks, "\n")
  
  # 結果格納用リスト
  results <- list()
  
  # チャンクごとに処理
  for (i in seq_len(n_chunks)) {
    skip_lines <- (i - 1) * chunk_size
    
    # チャンク読み込み
    chunk <- read_csv(
      file_path,
      skip = skip_lines,
      n_max = chunk_size,
      col_types = cols(),
      show_col_types = FALSE
    )
    
    # データ処理(例:売上データの集計)
    processed_chunk <- chunk %>%
      filter(!is.na(sales_amount)) %>%
      group_by(region, date) %>%
      summarise(
        total_sales = sum(sales_amount),
        avg_sales = mean(sales_amount),
        customer_count = n_distinct(customer_id),
        .groups = "drop"
      )
    
    results[[i]] <- processed_chunk
    cat("チャンク", i, "処理完了\n")
  }
  
  # 全チャンクの結果を統合
  bind_rows(results)
}

2. 並列処理によるパフォーマンス向上

# future + furrr による並列チャンク処理
library(future)
library(furrr)

# 並列処理環境の設定
plan(multisession, workers = 4)

# 並列チャンク処理関数
parallel_chunk_processing <- function(file_path, chunk_size = 50000) {
  total_lines <- count_lines(file_path)
  n_chunks <- ceiling(total_lines / chunk_size)
  
  # チャンク情報のリスト作成
  chunk_info <- tibble(
    chunk_id = seq_len(n_chunks),
    skip_lines = (chunk_id - 1) * chunk_size,
    n_max = ifelse(chunk_id == n_chunks, 
                   total_lines - skip_lines, 
                   chunk_size)
  )
  
  # 並列でチャンク処理
  results <- chunk_info %>%
    future_pmap_dfr(function(chunk_id, skip_lines, n_max) {
      # 各ワーカーでreadなしバッファ処理
      chunk <- read_csv(
        file_path,
        skip = skip_lines,
        n_max = n_max,
        col_types = cols(
          customer_id = col_character(),
          sales_amount = col_double(),
          region = col_factor(),
          date = col_date()
        ),
        locale = locale(encoding = "UTF-8")
      )
      
      # 高速データ変換
      chunk %>%
        filter(sales_amount > 0) %>%
        mutate(
          sales_category = case_when(
            sales_amount >= 10000 ~ "high",
            sales_amount >= 5000 ~ "medium",
            TRUE ~ "low"
          ),
          quarter = paste0("Q", quarter(date))
        ) %>%
        group_by(region, quarter, sales_category) %>%
        summarise(
          revenue = sum(sales_amount),
          transactions = n(),
          avg_transaction = mean(sales_amount),
          .groups = "drop"
        )
    }, .progress = TRUE)
  
  results
}

🔄 スケーラブルETLパイプライン

1. データ品質チェック付きETLパイプライン

# 包括的ETLパイプライン
library(readr)
library(dplyr)
library(lubridate)
library(logger)

# データ品質チェック関数
validate_data_quality <- function(data, spec) {
  issues <- list()
  
  # 必須列の存在チェック
  missing_cols <- setdiff(spec$required_columns, names(data))
  if (length(missing_cols) > 0) {
    issues$missing_columns <- missing_cols
  }
  
  # データ型チェック
  for (col in names(spec$column_types)) {
    if (col %in% names(data)) {
      expected_type <- spec$column_types[[col]]
      actual_type <- class(data[[col]])[1]
      if (actual_type != expected_type) {
        issues$type_mismatch <- c(issues$type_mismatch, 
                                 paste(col, ":", expected_type, "expected, got", actual_type))
      }
    }
  }
  
  # 欠損値チェック
  na_counts <- data %>%
    summarise(across(everything(), ~sum(is.na(.))))
  high_na_cols <- na_counts %>%
    select_if(~. > nrow(data) * 0.5) %>%
    names()
  if (length(high_na_cols) > 0) {
    issues$high_na_columns <- high_na_cols
  }
  
  list(valid = length(issues) == 0, issues = issues)
}

# 堅牢なETL関数
robust_etl_pipeline <- function(input_path, output_path, data_spec) {
  # ログ設定
  log_info("ETL開始: {input_path}")
  
  tryCatch({
    # EXTRACT: データ読み込み
    log_info("データ読み込み開始")
    raw_data <- read_csv(
      input_path,
      col_types = cols(),
      locale = locale(encoding = "UTF-8", date_format = "%Y-%m-%d"),
      na = c("", "NA", "NULL", "N/A", "#N/A")
    )
    log_info("読み込み完了: {nrow(raw_data)}行")
    
    # データ品質検証
    validation <- validate_data_quality(raw_data, data_spec)
    if (!validation$valid) {
      log_error("データ品質問題: {validation$issues}")
      stop("Data quality issues detected")
    }
    
    # TRANSFORM: データ変換
    log_info("データ変換開始")
    cleaned_data <- raw_data %>%
      # 重複除去
      distinct() %>%
      # 日付型変換
      mutate(
        across(contains("date"), ~as.Date(.)),
        across(contains("amount"), ~as.numeric(.)),
        across(where(is.character), ~str_trim(.))
      ) %>%
      # 外れ値処理
      filter(
        if_any(contains("amount"), ~between(., 
                                          quantile(., 0.01, na.rm = TRUE),
                                          quantile(., 0.99, na.rm = TRUE)))
      ) %>%
      # ビジネスルール適用
      filter(
        !is.na(customer_id),
        sales_amount > 0,
        date >= as.Date("2020-01-01")
      )
    
    log_info("変換完了: {nrow(cleaned_data)}行")
    
    # LOAD: データ出力
    log_info("データ出力開始")
    write_csv(
      cleaned_data,
      output_path,
      na = "",
      quote = "needed"
    )
    
    log_info("ETL完了: {output_path}")
    
    # 処理サマリー
    summary <- list(
      input_rows = nrow(raw_data),
      output_rows = nrow(cleaned_data),
      data_quality = validation,
      processing_time = Sys.time()
    )
    
    return(summary)
    
  }, error = function(e) {
    log_error("ETL失敗: {e$message}")
    stop(e)
  })
}

2. ストリーミングデータ処理

# リアルタイムデータストリーム処理
library(readr)
library(dplyr)
library(later)

# ストリーミング処理クラス
streaming_processor <- function(input_dir, output_dir, processing_func) {
  # 処理状態管理
  state <- list(
    processed_files = character(),
    error_count = 0,
    last_processed = Sys.time()
  )
  
  # ファイル監視と処理
  process_new_files <- function() {
    # 新しいファイルを検索
    current_files <- list.files(
      input_dir, 
      pattern = "\\.csv$", 
      full.names = TRUE
    )
    
    new_files <- setdiff(current_files, state$processed_files)
    
    if (length(new_files) > 0) {
      cat("新しいファイル発見:", length(new_files), "件\n")
      
      # 各ファイルを処理
      for (file in new_files) {
        tryCatch({
          # ファイル読み込み
          data <- read_csv(
            file,
            col_types = cols(),
            show_col_types = FALSE
          )
          
          # カスタム処理適用
          processed_data <- processing_func(data)
          
          # 出力ファイル名生成
          output_file <- file.path(
            output_dir,
            paste0("processed_", basename(file))
          )
          
          # 処理済みデータ出力
          write_csv(processed_data, output_file)
          
          # 処理完了記録
          state$processed_files <- c(state$processed_files, file)
          state$last_processed <- Sys.time()
          
          cat("処理完了:", basename(file), "\n")
          
        }, error = function(e) {
          cat("エラー:", basename(file), "-", e$message, "\n")
          state$error_count <- state$error_count + 1
        })
      }
    }
    
    # 次の監視をスケジュール(5秒後)
    later::later(process_new_files, 5)
  }
  
  # 監視開始
  process_new_files()
  
  # 制御関数を返す
  list(
    get_state = function() state,
    stop = function() {
      cat("ストリーミング処理停止\n")
    }
  )
}

# 使用例:売上データの集計処理
sales_aggregation <- function(data) {
  data %>%
    group_by(date = as.Date(timestamp), product_category) %>%
    summarise(
      total_sales = sum(amount, na.rm = TRUE),
      transaction_count = n(),
      unique_customers = n_distinct(customer_id),
      .groups = "drop"
    ) %>%
    mutate(avg_transaction_value = total_sales / transaction_count)
}

# ストリーミング処理開始
# processor <- streaming_processor(
#   input_dir = "data/incoming/",
#   output_dir = "data/processed/",
#   processing_func = sales_aggregation
# )

パフォーマンス最適化テクニック

# 高性能データ読み込み設定
optimized_read_settings <- function() {
  list(
    # メモリ使用量最小化
    lazy = TRUE,
    
    # 並列処理最適化
    num_threads = parallel::detectCores() - 1,
    
    # IO最適化
    buffer_size = 1024 * 1024,  # 1MB バッファ
    
    # 型推論最適化
    guess_max = 10000,
    
    # ロケール最適化
    locale = locale(
      encoding = "UTF-8",
      decimal_mark = ".",
      grouping_mark = ",",
      date_format = "%Y-%m-%d",
      datetime_format = "%Y-%m-%d %H:%M:%S"
    )
  )
}

# ベンチマーク測定関数
benchmark_reading_methods <- function(file_path) {
  library(microbenchmark)
  
  # 各種読み込み方法のベンチマーク
  results <- microbenchmark(
    # 基本設定
    basic = read_csv(file_path),
    
    # 型指定済み
    typed = read_csv(
      file_path,
      col_types = cols(
        id = col_character(),
        value = col_double(),
        date = col_date()
      )
    ),
    
    # 最適化設定
    optimized = read_csv(
      file_path,
      col_types = cols(),
      guess_max = 10000,
      locale = locale(encoding = "UTF-8")
    ),
    
    times = 10
  )
  
  print(results)
  autoplot(results)
}

💡 実践的なアドバイス

  • メモリ管理: システムメモリの70%以下に収まるようチャンクサイズを調整
  • 型指定: 大容量ファイルでは必ず事前に列型を指定してguess処理を回避
  • 並列処理: CPUコア数-1を並列度として設定(システム安定性確保)
  • エラーハンドリング: チャンクごとの処理では個別のエラーハンドリングを実装
  • 進捗監視: 長時間処理では進捗表示とログ出力を必ず実装
  • 品質保証: 各ステップでデータ品質チェックを実施