データ読み込みとパースの完全ガイド
readrパッケージは、Rにおけるデータ読み込みを高速かつ確実に行うための革新的なツールです。従来のread.csv()と比較して10倍以上高速で、データ型の自動推測やエラーハンドリングが格段に向上しています。
| ファイル形式 | readr関数 | 特徴 | 用途 |
|---|---|---|---|
| CSV | read_csv() | カンマ区切り、高速 | 一般的なデータ交換 |
| TSV | read_tsv() | タブ区切り | テキストデータ |
| 固定幅 | read_fwf() | 幅指定 | レガシーシステム |
| 区切り文字指定 | read_delim() | 任意の区切り文字 | カスタム形式 |
| ログファイル | read_lines() | 行単位読み込み | テキスト処理 |
readrは、データサイエンスプロジェクトの起点となる重要なパッケージです。大容量ファイルの効率的な読み込み、エラーハンドリング、データ品質の確保など、実務で欠かせない機能を提供します。特に、col_types引数を使った型指定や、部分読み込み機能は、大規模データ処理において威力を発揮します。
現代のデータサイエンスでは、メモリに収まらない大容量データやリアルタイムデータストリームの処理が日常的に必要です。本セクションでは、readrの高度な機能を活用した効率的なETL(Extract, Transform, Load)パイプラインの構築方法を学びます。
# 大容量CSVファイルをチャンクごとに処理
library(readr)
library(dplyr)
# チャンクサイズ設定(10万行ずつ)
chunk_size <- 100000
# 大容量ファイル処理関数
process_large_file <- function(file_path, chunk_size = 100000) {
# ファイル全体の行数を事前計算
total_lines <- count_lines(file_path)
n_chunks <- ceiling(total_lines / chunk_size)
cat("総行数:", total_lines, "チャンク数:", n_chunks, "\n")
# 結果格納用リスト
results <- list()
# チャンクごとに処理
for (i in seq_len(n_chunks)) {
skip_lines <- (i - 1) * chunk_size
# チャンク読み込み
chunk <- read_csv(
file_path,
skip = skip_lines,
n_max = chunk_size,
col_types = cols(),
show_col_types = FALSE
)
# データ処理(例:売上データの集計)
processed_chunk <- chunk %>%
filter(!is.na(sales_amount)) %>%
group_by(region, date) %>%
summarise(
total_sales = sum(sales_amount),
avg_sales = mean(sales_amount),
customer_count = n_distinct(customer_id),
.groups = "drop"
)
results[[i]] <- processed_chunk
cat("チャンク", i, "処理完了\n")
}
# 全チャンクの結果を統合
bind_rows(results)
}
# future + furrr による並列チャンク処理
library(future)
library(furrr)
# 並列処理環境の設定
plan(multisession, workers = 4)
# 並列チャンク処理関数
parallel_chunk_processing <- function(file_path, chunk_size = 50000) {
total_lines <- count_lines(file_path)
n_chunks <- ceiling(total_lines / chunk_size)
# チャンク情報のリスト作成
chunk_info <- tibble(
chunk_id = seq_len(n_chunks),
skip_lines = (chunk_id - 1) * chunk_size,
n_max = ifelse(chunk_id == n_chunks,
total_lines - skip_lines,
chunk_size)
)
# 並列でチャンク処理
results <- chunk_info %>%
future_pmap_dfr(function(chunk_id, skip_lines, n_max) {
# 各ワーカーでreadなしバッファ処理
chunk <- read_csv(
file_path,
skip = skip_lines,
n_max = n_max,
col_types = cols(
customer_id = col_character(),
sales_amount = col_double(),
region = col_factor(),
date = col_date()
),
locale = locale(encoding = "UTF-8")
)
# 高速データ変換
chunk %>%
filter(sales_amount > 0) %>%
mutate(
sales_category = case_when(
sales_amount >= 10000 ~ "high",
sales_amount >= 5000 ~ "medium",
TRUE ~ "low"
),
quarter = paste0("Q", quarter(date))
) %>%
group_by(region, quarter, sales_category) %>%
summarise(
revenue = sum(sales_amount),
transactions = n(),
avg_transaction = mean(sales_amount),
.groups = "drop"
)
}, .progress = TRUE)
results
}
# 包括的ETLパイプライン
library(readr)
library(dplyr)
library(lubridate)
library(logger)
# データ品質チェック関数
validate_data_quality <- function(data, spec) {
issues <- list()
# 必須列の存在チェック
missing_cols <- setdiff(spec$required_columns, names(data))
if (length(missing_cols) > 0) {
issues$missing_columns <- missing_cols
}
# データ型チェック
for (col in names(spec$column_types)) {
if (col %in% names(data)) {
expected_type <- spec$column_types[[col]]
actual_type <- class(data[[col]])[1]
if (actual_type != expected_type) {
issues$type_mismatch <- c(issues$type_mismatch,
paste(col, ":", expected_type, "expected, got", actual_type))
}
}
}
# 欠損値チェック
na_counts <- data %>%
summarise(across(everything(), ~sum(is.na(.))))
high_na_cols <- na_counts %>%
select_if(~. > nrow(data) * 0.5) %>%
names()
if (length(high_na_cols) > 0) {
issues$high_na_columns <- high_na_cols
}
list(valid = length(issues) == 0, issues = issues)
}
# 堅牢なETL関数
robust_etl_pipeline <- function(input_path, output_path, data_spec) {
# ログ設定
log_info("ETL開始: {input_path}")
tryCatch({
# EXTRACT: データ読み込み
log_info("データ読み込み開始")
raw_data <- read_csv(
input_path,
col_types = cols(),
locale = locale(encoding = "UTF-8", date_format = "%Y-%m-%d"),
na = c("", "NA", "NULL", "N/A", "#N/A")
)
log_info("読み込み完了: {nrow(raw_data)}行")
# データ品質検証
validation <- validate_data_quality(raw_data, data_spec)
if (!validation$valid) {
log_error("データ品質問題: {validation$issues}")
stop("Data quality issues detected")
}
# TRANSFORM: データ変換
log_info("データ変換開始")
cleaned_data <- raw_data %>%
# 重複除去
distinct() %>%
# 日付型変換
mutate(
across(contains("date"), ~as.Date(.)),
across(contains("amount"), ~as.numeric(.)),
across(where(is.character), ~str_trim(.))
) %>%
# 外れ値処理
filter(
if_any(contains("amount"), ~between(.,
quantile(., 0.01, na.rm = TRUE),
quantile(., 0.99, na.rm = TRUE)))
) %>%
# ビジネスルール適用
filter(
!is.na(customer_id),
sales_amount > 0,
date >= as.Date("2020-01-01")
)
log_info("変換完了: {nrow(cleaned_data)}行")
# LOAD: データ出力
log_info("データ出力開始")
write_csv(
cleaned_data,
output_path,
na = "",
quote = "needed"
)
log_info("ETL完了: {output_path}")
# 処理サマリー
summary <- list(
input_rows = nrow(raw_data),
output_rows = nrow(cleaned_data),
data_quality = validation,
processing_time = Sys.time()
)
return(summary)
}, error = function(e) {
log_error("ETL失敗: {e$message}")
stop(e)
})
}
# リアルタイムデータストリーム処理
library(readr)
library(dplyr)
library(later)
# ストリーミング処理クラス
streaming_processor <- function(input_dir, output_dir, processing_func) {
# 処理状態管理
state <- list(
processed_files = character(),
error_count = 0,
last_processed = Sys.time()
)
# ファイル監視と処理
process_new_files <- function() {
# 新しいファイルを検索
current_files <- list.files(
input_dir,
pattern = "\\.csv$",
full.names = TRUE
)
new_files <- setdiff(current_files, state$processed_files)
if (length(new_files) > 0) {
cat("新しいファイル発見:", length(new_files), "件\n")
# 各ファイルを処理
for (file in new_files) {
tryCatch({
# ファイル読み込み
data <- read_csv(
file,
col_types = cols(),
show_col_types = FALSE
)
# カスタム処理適用
processed_data <- processing_func(data)
# 出力ファイル名生成
output_file <- file.path(
output_dir,
paste0("processed_", basename(file))
)
# 処理済みデータ出力
write_csv(processed_data, output_file)
# 処理完了記録
state$processed_files <- c(state$processed_files, file)
state$last_processed <- Sys.time()
cat("処理完了:", basename(file), "\n")
}, error = function(e) {
cat("エラー:", basename(file), "-", e$message, "\n")
state$error_count <- state$error_count + 1
})
}
}
# 次の監視をスケジュール(5秒後)
later::later(process_new_files, 5)
}
# 監視開始
process_new_files()
# 制御関数を返す
list(
get_state = function() state,
stop = function() {
cat("ストリーミング処理停止\n")
}
)
}
# 使用例:売上データの集計処理
sales_aggregation <- function(data) {
data %>%
group_by(date = as.Date(timestamp), product_category) %>%
summarise(
total_sales = sum(amount, na.rm = TRUE),
transaction_count = n(),
unique_customers = n_distinct(customer_id),
.groups = "drop"
) %>%
mutate(avg_transaction_value = total_sales / transaction_count)
}
# ストリーミング処理開始
# processor <- streaming_processor(
# input_dir = "data/incoming/",
# output_dir = "data/processed/",
# processing_func = sales_aggregation
# )
# 高性能データ読み込み設定
optimized_read_settings <- function() {
list(
# メモリ使用量最小化
lazy = TRUE,
# 並列処理最適化
num_threads = parallel::detectCores() - 1,
# IO最適化
buffer_size = 1024 * 1024, # 1MB バッファ
# 型推論最適化
guess_max = 10000,
# ロケール最適化
locale = locale(
encoding = "UTF-8",
decimal_mark = ".",
grouping_mark = ",",
date_format = "%Y-%m-%d",
datetime_format = "%Y-%m-%d %H:%M:%S"
)
)
}
# ベンチマーク測定関数
benchmark_reading_methods <- function(file_path) {
library(microbenchmark)
# 各種読み込み方法のベンチマーク
results <- microbenchmark(
# 基本設定
basic = read_csv(file_path),
# 型指定済み
typed = read_csv(
file_path,
col_types = cols(
id = col_character(),
value = col_double(),
date = col_date()
)
),
# 最適化設定
optimized = read_csv(
file_path,
col_types = cols(),
guess_max = 10000,
locale = locale(encoding = "UTF-8")
),
times = 10
)
print(results)
autoplot(results)
}