データ読み込みとパースの完全ガイド
readrパッケージは、Rにおけるデータ読み込みを高速かつ確実に行うための革新的なツールです。従来のread.csv()と比較して10倍以上高速で、データ型の自動推測やエラーハンドリングが格段に向上しています。
| ファイル形式 | readr関数 | 特徴 | 用途 | 
|---|---|---|---|
| CSV | read_csv() | カンマ区切り、高速 | 一般的なデータ交換 | 
| TSV | read_tsv() | タブ区切り | テキストデータ | 
| 固定幅 | read_fwf() | 幅指定 | レガシーシステム | 
| 区切り文字指定 | read_delim() | 任意の区切り文字 | カスタム形式 | 
| ログファイル | read_lines() | 行単位読み込み | テキスト処理 | 
readrは、データサイエンスプロジェクトの起点となる重要なパッケージです。大容量ファイルの効率的な読み込み、エラーハンドリング、データ品質の確保など、実務で欠かせない機能を提供します。特に、col_types引数を使った型指定や、部分読み込み機能は、大規模データ処理において威力を発揮します。
現代のデータサイエンスでは、メモリに収まらない大容量データやリアルタイムデータストリームの処理が日常的に必要です。本セクションでは、readrの高度な機能を活用した効率的なETL(Extract, Transform, Load)パイプラインの構築方法を学びます。
# 大容量CSVファイルをチャンクごとに処理
library(readr)
library(dplyr)
# チャンクサイズ設定(10万行ずつ)
chunk_size <- 100000
# 大容量ファイル処理関数
process_large_file <- function(file_path, chunk_size = 100000) {
  # ファイル全体の行数を事前計算
  total_lines <- count_lines(file_path)
  n_chunks <- ceiling(total_lines / chunk_size)
  
  cat("総行数:", total_lines, "チャンク数:", n_chunks, "\n")
  
  # 結果格納用リスト
  results <- list()
  
  # チャンクごとに処理
  for (i in seq_len(n_chunks)) {
    skip_lines <- (i - 1) * chunk_size
    
    # チャンク読み込み
    chunk <- read_csv(
      file_path,
      skip = skip_lines,
      n_max = chunk_size,
      col_types = cols(),
      show_col_types = FALSE
    )
    
    # データ処理(例:売上データの集計)
    processed_chunk <- chunk %>%
      filter(!is.na(sales_amount)) %>%
      group_by(region, date) %>%
      summarise(
        total_sales = sum(sales_amount),
        avg_sales = mean(sales_amount),
        customer_count = n_distinct(customer_id),
        .groups = "drop"
      )
    
    results[[i]] <- processed_chunk
    cat("チャンク", i, "処理完了\n")
  }
  
  # 全チャンクの結果を統合
  bind_rows(results)
}
                    # future + furrr による並列チャンク処理
library(future)
library(furrr)
# 並列処理環境の設定
plan(multisession, workers = 4)
# 並列チャンク処理関数
parallel_chunk_processing <- function(file_path, chunk_size = 50000) {
  total_lines <- count_lines(file_path)
  n_chunks <- ceiling(total_lines / chunk_size)
  
  # チャンク情報のリスト作成
  chunk_info <- tibble(
    chunk_id = seq_len(n_chunks),
    skip_lines = (chunk_id - 1) * chunk_size,
    n_max = ifelse(chunk_id == n_chunks, 
                   total_lines - skip_lines, 
                   chunk_size)
  )
  
  # 並列でチャンク処理
  results <- chunk_info %>%
    future_pmap_dfr(function(chunk_id, skip_lines, n_max) {
      # 各ワーカーでreadなしバッファ処理
      chunk <- read_csv(
        file_path,
        skip = skip_lines,
        n_max = n_max,
        col_types = cols(
          customer_id = col_character(),
          sales_amount = col_double(),
          region = col_factor(),
          date = col_date()
        ),
        locale = locale(encoding = "UTF-8")
      )
      
      # 高速データ変換
      chunk %>%
        filter(sales_amount > 0) %>%
        mutate(
          sales_category = case_when(
            sales_amount >= 10000 ~ "high",
            sales_amount >= 5000 ~ "medium",
            TRUE ~ "low"
          ),
          quarter = paste0("Q", quarter(date))
        ) %>%
        group_by(region, quarter, sales_category) %>%
        summarise(
          revenue = sum(sales_amount),
          transactions = n(),
          avg_transaction = mean(sales_amount),
          .groups = "drop"
        )
    }, .progress = TRUE)
  
  results
}
                    # 包括的ETLパイプライン
library(readr)
library(dplyr)
library(lubridate)
library(logger)
# データ品質チェック関数
validate_data_quality <- function(data, spec) {
  issues <- list()
  
  # 必須列の存在チェック
  missing_cols <- setdiff(spec$required_columns, names(data))
  if (length(missing_cols) > 0) {
    issues$missing_columns <- missing_cols
  }
  
  # データ型チェック
  for (col in names(spec$column_types)) {
    if (col %in% names(data)) {
      expected_type <- spec$column_types[[col]]
      actual_type <- class(data[[col]])[1]
      if (actual_type != expected_type) {
        issues$type_mismatch <- c(issues$type_mismatch, 
                                 paste(col, ":", expected_type, "expected, got", actual_type))
      }
    }
  }
  
  # 欠損値チェック
  na_counts <- data %>%
    summarise(across(everything(), ~sum(is.na(.))))
  high_na_cols <- na_counts %>%
    select_if(~. > nrow(data) * 0.5) %>%
    names()
  if (length(high_na_cols) > 0) {
    issues$high_na_columns <- high_na_cols
  }
  
  list(valid = length(issues) == 0, issues = issues)
}
# 堅牢なETL関数
robust_etl_pipeline <- function(input_path, output_path, data_spec) {
  # ログ設定
  log_info("ETL開始: {input_path}")
  
  tryCatch({
    # EXTRACT: データ読み込み
    log_info("データ読み込み開始")
    raw_data <- read_csv(
      input_path,
      col_types = cols(),
      locale = locale(encoding = "UTF-8", date_format = "%Y-%m-%d"),
      na = c("", "NA", "NULL", "N/A", "#N/A")
    )
    log_info("読み込み完了: {nrow(raw_data)}行")
    
    # データ品質検証
    validation <- validate_data_quality(raw_data, data_spec)
    if (!validation$valid) {
      log_error("データ品質問題: {validation$issues}")
      stop("Data quality issues detected")
    }
    
    # TRANSFORM: データ変換
    log_info("データ変換開始")
    cleaned_data <- raw_data %>%
      # 重複除去
      distinct() %>%
      # 日付型変換
      mutate(
        across(contains("date"), ~as.Date(.)),
        across(contains("amount"), ~as.numeric(.)),
        across(where(is.character), ~str_trim(.))
      ) %>%
      # 外れ値処理
      filter(
        if_any(contains("amount"), ~between(., 
                                          quantile(., 0.01, na.rm = TRUE),
                                          quantile(., 0.99, na.rm = TRUE)))
      ) %>%
      # ビジネスルール適用
      filter(
        !is.na(customer_id),
        sales_amount > 0,
        date >= as.Date("2020-01-01")
      )
    
    log_info("変換完了: {nrow(cleaned_data)}行")
    
    # LOAD: データ出力
    log_info("データ出力開始")
    write_csv(
      cleaned_data,
      output_path,
      na = "",
      quote = "needed"
    )
    
    log_info("ETL完了: {output_path}")
    
    # 処理サマリー
    summary <- list(
      input_rows = nrow(raw_data),
      output_rows = nrow(cleaned_data),
      data_quality = validation,
      processing_time = Sys.time()
    )
    
    return(summary)
    
  }, error = function(e) {
    log_error("ETL失敗: {e$message}")
    stop(e)
  })
}
                    # リアルタイムデータストリーム処理
library(readr)
library(dplyr)
library(later)
# ストリーミング処理クラス
streaming_processor <- function(input_dir, output_dir, processing_func) {
  # 処理状態管理
  state <- list(
    processed_files = character(),
    error_count = 0,
    last_processed = Sys.time()
  )
  
  # ファイル監視と処理
  process_new_files <- function() {
    # 新しいファイルを検索
    current_files <- list.files(
      input_dir, 
      pattern = "\\.csv$", 
      full.names = TRUE
    )
    
    new_files <- setdiff(current_files, state$processed_files)
    
    if (length(new_files) > 0) {
      cat("新しいファイル発見:", length(new_files), "件\n")
      
      # 各ファイルを処理
      for (file in new_files) {
        tryCatch({
          # ファイル読み込み
          data <- read_csv(
            file,
            col_types = cols(),
            show_col_types = FALSE
          )
          
          # カスタム処理適用
          processed_data <- processing_func(data)
          
          # 出力ファイル名生成
          output_file <- file.path(
            output_dir,
            paste0("processed_", basename(file))
          )
          
          # 処理済みデータ出力
          write_csv(processed_data, output_file)
          
          # 処理完了記録
          state$processed_files <- c(state$processed_files, file)
          state$last_processed <- Sys.time()
          
          cat("処理完了:", basename(file), "\n")
          
        }, error = function(e) {
          cat("エラー:", basename(file), "-", e$message, "\n")
          state$error_count <- state$error_count + 1
        })
      }
    }
    
    # 次の監視をスケジュール(5秒後)
    later::later(process_new_files, 5)
  }
  
  # 監視開始
  process_new_files()
  
  # 制御関数を返す
  list(
    get_state = function() state,
    stop = function() {
      cat("ストリーミング処理停止\n")
    }
  )
}
# 使用例:売上データの集計処理
sales_aggregation <- function(data) {
  data %>%
    group_by(date = as.Date(timestamp), product_category) %>%
    summarise(
      total_sales = sum(amount, na.rm = TRUE),
      transaction_count = n(),
      unique_customers = n_distinct(customer_id),
      .groups = "drop"
    ) %>%
    mutate(avg_transaction_value = total_sales / transaction_count)
}
# ストリーミング処理開始
# processor <- streaming_processor(
#   input_dir = "data/incoming/",
#   output_dir = "data/processed/",
#   processing_func = sales_aggregation
# )
                    # 高性能データ読み込み設定
optimized_read_settings <- function() {
  list(
    # メモリ使用量最小化
    lazy = TRUE,
    
    # 並列処理最適化
    num_threads = parallel::detectCores() - 1,
    
    # IO最適化
    buffer_size = 1024 * 1024,  # 1MB バッファ
    
    # 型推論最適化
    guess_max = 10000,
    
    # ロケール最適化
    locale = locale(
      encoding = "UTF-8",
      decimal_mark = ".",
      grouping_mark = ",",
      date_format = "%Y-%m-%d",
      datetime_format = "%Y-%m-%d %H:%M:%S"
    )
  )
}
# ベンチマーク測定関数
benchmark_reading_methods <- function(file_path) {
  library(microbenchmark)
  
  # 各種読み込み方法のベンチマーク
  results <- microbenchmark(
    # 基本設定
    basic = read_csv(file_path),
    
    # 型指定済み
    typed = read_csv(
      file_path,
      col_types = cols(
        id = col_character(),
        value = col_double(),
        date = col_date()
      )
    ),
    
    # 最適化設定
    optimized = read_csv(
      file_path,
      col_types = cols(),
      guess_max = 10000,
      locale = locale(encoding = "UTF-8")
    ),
    
    times = 10
  )
  
  print(results)
  autoplot(results)
}
                    ※ 当サイトはAmazonアソシエイトプログラムに参加しています