Как объединить CSV-файлы в один DataFrame: инструкция для Python

Пройдите тест, узнайте какой профессии подходите
Сколько вам лет
0%
До 18
От 18 до 24
От 25 до 34
От 35 до 44
От 45 до 49
От 50 до 54
Больше 55

Для кого эта статья:

  • Аналитики данных и специалисты по обработке данных
  • Студенты и начинающие профессионалы в области анализа данных
  • Специалисты, ищущие эффективные решения для обработки и объединения больших объемов данных

    Работая с данными, аналитик часто сталкивается с необходимостью объединить информацию из десятков, а иногда и сотен CSV-файлов. Ручное копирование — путь к ошибкам и потере времени. Эффективное решение этой задачи с помощью Python и библиотеки pandas не только экономит ресурсы, но и открывает новые возможности для анализа. Правильно настроенный процесс объединения данных позволяет за секунды собрать разрозненную информацию в единый DataFrame, готовый к дальнейшей обработке. 🚀

Хотите освоить профессиональные техники работы с данными? Программа Профессия аналитик данных от Skypro научит вас не только объединять CSV-файлы, но и строить полноценные аналитические пайплайны. Вы освоите продвинутые возможности pandas, научитесь трансформировать и визуализировать данные, а главное — превращать сырые цифры в инсайты для бизнеса. Более 87% выпускников находят работу в первые три месяца после обучения.

Подготовка среды для объединения CSV-файлов в pandas

Перед тем как приступить к объединению CSV-файлов, необходимо правильно настроить рабочее окружение. Это фундамент, на котором строится весь последующий процесс обработки данных.

Начнем с установки необходимых библиотек. Если вы еще не установили pandas, выполните следующую команду:

pip install pandas

Для работы с файловой системой и поиска CSV-файлов по шаблону нам понадобится стандартный модуль glob:

Python
Скопировать код
import pandas as pd
import glob
import os

Структурирование проекта — важный этап подготовки. Рекомендую организовать файловую структуру следующим образом:

  • data/ — директория с исходными CSV-файлами
  • output/ — директория для сохранения результатов
  • scripts/ — директория с Python-скриптами

Проверка наличия и доступа к файлам — обязательный шаг, который поможет избежать ошибок в процессе объединения:

Python
Скопировать код
csv_files = glob.glob('data/*.csv')
print(f'Найдено файлов: {len(csv_files)}')
for file in csv_files[:5]: # Вывод первых 5 файлов для проверки
print(f'- {file}')

Александр Петров, Lead Data Scientist

Однажды мне пришлось объединить данные о продажах из 47 региональных отделений компании. Каждый офис высылал еженедельный отчет в формате CSV. Чтобы не тратить каждый понедельник на ручное объединение, я потратил час на настройку среды и написание скрипта. В итоге процесс, который занимал 3 часа еженедельно, сократился до 10 секунд автоматической работы. Ключевым моментом стала правильная настройка директории с файлами и стандартизация названий входящих отчетов. Я создал шаблон "salesreportYYYYMMDD.csv" и обязал все отделения следовать ему, что значительно упростило автоматизацию.

Перед объединением необходимо проверить совместимость данных. Полезно просмотреть структуру нескольких файлов:

Python
Скопировать код
# Проверка структуры первого файла
if csv_files:
sample_df = pd.read_csv(csv_files[0])
print("Структура первого файла:")
print(sample_df.info())
print("\nПервые 5 строк:")
print(sample_df.head())

Настройка параметров чтения файлов также важна для правильного объединения:

Параметр Описание Пример использования
delimiter Разделитель в CSV-файле pd.read_csv('file.csv', delimiter=';')
encoding Кодировка файла pd.read_csv('file.csv', encoding='utf-8')
dtype Типы данных колонок pd.read_csv('file.csv', dtype={'ID': str})
parse_dates Колонки для обработки как даты pd.readcsv('file.csv', parsedates=['Date'])
na_values Значения, которые считать как NaN pd.readcsv('file.csv', navalues=['-', 'NA'])

Создав правильную основу, мы готовы перейти к непосредственному объединению файлов. 🛠️

Пошаговый план для смены профессии

Основные методы объединения CSV в единый DataFrame

Существует несколько ключевых подходов к объединению CSV-файлов в pandas, каждый из которых имеет свои преимущества в зависимости от задачи и структуры данных.

Самый распространенный метод — использование функции pd.concat() для последовательного объединения DataFrame'ов:

Python
Скопировать код
# Базовый пример объединения с помощью pd.concat()
dfs = [] # список для хранения DataFrame'ов

for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
dfs.append(df)

# Объединение всех DataFrame'ов в один
combined_df = pd.concat(dfs, ignore_index=True)
print(f"Итоговый DataFrame содержит {combined_df.shape[0]} строк и {combined_df.shape[1]} колонок")

Параметры функции pd.concat() существенно влияют на результат объединения:

  • ignore_index=True — сбрасывает индексы исходных DataFrame'ов и создает новую непрерывную индексацию
  • axis=0 (по умолчанию) — объединение "по вертикали", добавляя строки
  • axis=1 — объединение "по горизонтали", добавляя новые колонки
  • join='outer' (по умолчанию) — включает все строки/колонки из всех DataFrame'ов
  • join='inner' — включает только строки/колонки, присутствующие во всех DataFrame'ах

Альтернативный метод — чтение и объединение файлов "на лету" без промежуточного хранения в списке:

Python
Скопировать код
# Объединение "на лету" без использования промежуточного списка
combined_df = pd.concat(
(pd.read_csv(file) for file in glob.glob('data/*.csv')),
ignore_index=True
)

Для более сложных сценариев можно использовать метод merge(), который позволяет объединять DataFrame'ы по определенным ключам, аналогично SQL-соединениям:

Python
Скопировать код
# Пример объединения с использованием merge()
base_df = pd.read_csv('data/main.csv')

for file in glob.glob('data/additional_*.csv'):
additional_df = pd.read_csv(file)
# Объединение по ключевому полю 'id'
base_df = pd.merge(
base_df, 
additional_df, 
on='id', 
how='left' # 'left', 'right', 'inner', 'outer'
)

Для более специфических задач можно использовать функцию append(), хотя она постепенно устаревает в пользу concat():

Python
Скопировать код
# Использование append() (deprecated в новых версиях pandas)
result_df = pd.DataFrame()
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
result_df = result_df.append(df, ignore_index=True)

Метод Преимущества Недостатки Использовать, когда...
pd.concat() Гибкий, эффективный, работает с любым количеством DataFrame'ов Может потреблять много памяти при большом числе файлов У всех файлов одинаковая или похожая структура
pd.merge() Точное управление объединением по ключам Сложнее в использовании, медленнее для простых задач Необходимо объединение по определенным полям (как SQL JOIN)
append() Прост в использовании Устарел в новых версиях, менее эффективен Работа с устаревшим кодом
read_csv + аргументы Одна операция без промежуточных шагов Применим только для идентичных файлов Файлы находятся в одной директории и имеют одинаковый формат

Выбор метода объединения зависит от конкретной задачи, объема данных и структуры файлов. Для большинства стандартных задач оптимальным решением является pd.concat() с правильно подобранными параметрами. 📊

Объединение файлов с разной структурой в pandas

В реальных проектах часто приходится сталкиваться с ситуацией, когда CSV-файлы, которые нужно объединить, имеют различную структуру. Это может проявляться в разных наборах колонок, несовпадающих типах данных или различных форматах представления информации.

Первый шаг при работе с файлами разной структуры — анализ различий между ними:

Python
Скопировать код
# Анализ структур разных CSV-файлов
structures = {}

for file in glob.glob('data/*.csv'):
filename = os.path.basename(file)
df = pd.read_csv(file)
structures[filename] = {
'columns': list(df.columns),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'rows': len(df)
}

# Найти общие колонки во всех файлах
all_columns = set()
for file_info in structures.values():
all_columns.update(file_info['columns'])

common_columns = set.intersection(*[set(file_info['columns']) for file_info in structures.values()])

print(f"Всего уникальных колонок: {len(all_columns)}")
print(f"Общих колонок во всех файлах: {len(common_columns)}")
print(f"Общие колонки: {common_columns}")

После анализа структуры файлов можно применить один из следующих подходов:

  • Объединение только по общим колонкам — подходит, когда важны только те данные, которые присутствуют во всех файлах
  • Объединение с заполнением отсутствующих колонок — сохраняет все данные, заполняя пропуски значениями NaN
  • Нормализация данных перед объединением — преобразование каждого файла к единой структуре

Вот пример объединения файлов с разными наборами колонок, с сохранением всех уникальных колонок:

Python
Скопировать код
# Объединение файлов с разной структурой
dfs = []

for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
# Добавим информацию об источнике данных
df['source_file'] = os.path.basename(file)
dfs.append(df)

# Объединение с outer join для сохранения всех колонок
combined_df = pd.concat(dfs, ignore_index=True, sort=False, join='outer')

# Проверка количества пропущенных значений в каждой колонке
missing_values = combined_df.isnull().sum()
print("Количество пропущенных значений по колонкам:")
print(missing_values[missing_values > 0])

Для решения проблемы несовпадающих типов данных можно использовать предварительное приведение к общему типу:

Python
Скопировать код
# Приведение колонок к общему типу данных
def standardize_dataframe(df, column_types):
for col, dtype in column_types.items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except:
# Если не удалось преобразовать, оставляем как есть
pass
return df

# Определим целевые типы данных для колонок
target_types = {
'id': 'int64',
'name': 'str',
'date': 'datetime64[ns]',
'value': 'float64'
}

# Обработка каждого файла
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
df = standardize_dataframe(df, target_types)
dfs.append(df)

combined_df = pd.concat(dfs, ignore_index=True)

Иногда требуется более глубокая трансформация данных перед объединением, например, переименование или объединение колонок:

Python
Скопировать код
# Пример более сложной предобработки
def preprocess_dataframe(df, file_path):
filename = os.path.basename(file_path)

# Переименование колонок в зависимости от источника
if 'old_report' in filename:
# В старых отчетах колонка называется 'client_id'
if 'client_id' in df.columns:
df = df.rename(columns={'client_id': 'customer_id'})

# Преобразование формата даты
if 'date' in df.columns:
try:
df['date'] = pd.to_datetime(df['date'])
except:
# Если не удалось автоматически, пробуем разные форматы
try:
df['date'] = pd.to_datetime(df['date'], format='%d.%m.%Y')
except:
# Если всё ещё не удалось, помечаем как проблемные
df['date_error'] = True

return df

# Применение предобработки к каждому файлу
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
df = preprocess_dataframe(df, file)
dfs.append(df)

combined_df = pd.concat(dfs, ignore_index=True)

Мария Соколова, Data Engineer

В одном из проектов по анализу логов пользовательской активности я столкнулась с необходимостью объединить более 300 CSV-файлов, собранных с разных серверов за квартал. Проблема была в том, что в зависимости от версии ПО, установленного на серверах, формат логов различался: в некоторых файлах IP-адреса хранились в одной колонке, в других были разделены на четыре компонента, даты имели разный формат, а некоторые поля присутствовали не во всех файлах.

Вместо ручной стандартизации каждого файла я создала "карту преобразований" – словарь Python, который определял правила нормализации для каждой версии лога. Перед объединением скрипт определял версию формата по характерным признакам и применял соответствующие трансформации. Это позволило автоматически обрабатывать даже те файлы, которые появились уже после запуска проекта. Трансформация включала приведение дат к единому формату, склеивание/разделение IP-адресов и генерацию синтетических признаков для заполнения отсутствующей информации.

Работа с файлами разной структуры требует баланса между сохранением данных и получением согласованного результата. Тщательная предварительная обработка позволяет избежать проблем на этапе анализа объединенных данных. 🔄

Оптимизация процесса обработки больших CSV-файлов

При работе с большими объемами данных стандартный подход к объединению CSV-файлов может привести к проблемам с производительностью или даже к сбоям из-за нехватки оперативной памяти. Рассмотрим методы оптимизации, которые помогут эффективно работать с большими датасетами.

Первое, что следует учитывать — чтение только необходимых колонок вместо загрузки всего файла:

Python
Скопировать код
# Чтение только нужных колонок
required_columns = ['id', 'timestamp', 'value']
dfs = []

for file in glob.glob('data/*.csv'):
# usecols позволяет загрузить только указанные колонки
df = pd.read_csv(file, usecols=required_columns)
dfs.append(df)

combined_df = pd.concat(dfs, ignore_index=True)

Для очень больших файлов эффективно использовать порционное чтение с параметром chunksize:

Python
Скопировать код
# Обработка файлов по частям
combined_df = pd.DataFrame()

for file in glob.glob('data/*.csv'):
# Чтение файла порциями по 100,000 строк
for chunk in pd.read_csv(file, chunksize=100000):
# Здесь можно выполнить предварительную обработку чанка
combined_df = pd.concat([combined_df, chunk], ignore_index=True)

# Опционально: сохранение промежуточных результатов
combined_df.to_csv('output/intermediate_result.csv', index=False)
print(f"Файл {file} обработан, всего строк: {len(combined_df)}")

Оптимизация типов данных может значительно сократить использование памяти:

Python
Скопировать код
# Оптимизация типов данных
def optimize_df_types(df):
# Оптимизация целочисленных колонок
for col in df.select_dtypes(include=['int']).columns:
col_min = df[col].min()
col_max = df[col].max()

if col_min >= 0:
if col_max < 255:
df[col] = df[col].astype('uint8')
elif col_max < 65535:
df[col] = df[col].astype('uint16')
elif col_max < 4294967295:
df[col] = df[col].astype('uint32')
else:
df[col] = df[col].astype('uint64')
else:
if col_min > -128 and col_max < 127:
df[col] = df[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype('int32')

# Оптимизация колонок с плавающей точкой
for col in df.select_dtypes(include=['float']).columns:
df[col] = df[col].astype('float32')

# Оптимизация строковых колонок
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() < len(df[col]) * 0.5: # Если много повторяющихся значений
df[col] = df[col].astype('category')

return df

# Применение оптимизации к каждому файлу
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
df = optimize_df_types(df)
dfs.append(df)

combined_df = pd.concat(dfs, ignore_index=True)
# Применим оптимизацию и к итоговому DataFrame
combined_df = optimize_df_types(combined_df)

# Проверка использования памяти
print(f"Использование памяти: {combined_df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")

Для сверхбольших объемов данных может потребоваться использование специализированных библиотек:

  • dask — параллельные вычисления для больших датасетов, с API, аналогичным pandas
  • vaex — обработка табличных данных для объектов, превышающих объем оперативной памяти
  • modin — ускорение pandas с минимальными изменениями кода

Пример использования dask для объединения больших CSV-файлов:

Python
Скопировать код
# Установка: pip install dask[dataframe]
import dask.dataframe as dd

# Загрузка данных с помощью dask
ddf = dd.read_csv('data/*.csv')

# Выполнение операций аналогично pandas
result = ddf.groupby('category')['value'].mean().compute()

# Сохранение результата
ddf.to_csv('output/combined_data_*.csv') # Создаст несколько файлов с партициями данных

Сравнение эффективности различных подходов:

Метод Преимущества Ограничения Применение
Стандартный pandas Простота использования, полная функциональность Ограничен объемом RAM Файлы до 1-2 GB в сумме
Чтение по частям (chunksize) Работает с ограниченной памятью Медленнее, сложнее код Файлы от 2 до 10 GB
Оптимизация типов Может снизить использование памяти в 2-5 раз Требует анализа данных Любые объемы, как дополнительная оптимизация
Dask Параллельная обработка, поддержка кластеров Не все функции pandas доступны Файлы от 10 GB и больше
Spark (PySpark) Распределенная обработка, высокая масштабируемость Сложность настройки, другой синтаксис Терабайты данных, корпоративные задачи

При обработке больших объемов данных стоит также рассмотреть сохранение промежуточных результатов в более эффективных форматах, таких как Parquet или Feather:

Python
Скопировать код
# Сохранение в эффективном формате
combined_df.to_parquet('output/combined_data.parquet')

# При последующей загрузке данные будут загружены быстрее
df = pd.read_parquet('output/combined_data.parquet')

Выбор правильного метода оптимизации зависит от конкретной задачи, объема данных и доступных ресурсов. Для большинства практических случаев сочетание правильного выбора колонок, оптимизации типов данных и чтения по частям дает хороший баланс между простотой реализации и эффективностью. 💾

Автоматизация объединения CSV с помощью Python-скриптов

Автоматизация процесса объединения CSV-файлов позволяет не только сэкономить время, но и создать повторяемый, надежный рабочий процесс, который можно запускать по расписанию или по мере поступления новых данных.

Рассмотрим создание полноценного Python-скрипта, который будет автоматически объединять CSV-файлы из указанной директории с возможностью настройки различных параметров:

Python
Скопировать код
#!/usr/bin/env python3
# csv_combiner.py

import pandas as pd
import glob
import os
import argparse
import logging
from datetime import datetime

# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s – %(levelname)s – %(message)s',
handlers=[
logging.FileHandler("csv_combiner.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger()

def combine_csv_files(input_path, output_file, columns=None, sort_by=None, 
filter_condition=None, add_metadata=False):
"""
Объединяет CSV-файлы из указанной директории.

Args:
input_path (str): Путь к директории с CSV-файлами или шаблон glob
output_file (str): Путь для сохранения результата
columns (list): Список колонок для выбора (None = все)
sort_by (str): Колонка для сортировки результата
filter_condition (str): Условие фильтрации в формате 'column>value'
add_metadata (bool): Добавлять ли метаданные о файлах-источниках

Returns:
pd.DataFrame: Объединенный DataFrame
"""
start_time = datetime.now()
logger.info(f"Начало объединения файлов из {input_path}")

# Находим все файлы по шаблону
csv_files = glob.glob(input_path)
logger.info(f"Найдено {len(csv_files)} CSV-файлов")

if len(csv_files) == 0:
logger.error(f"Не найдено CSV-файлов по пути {input_path}")
return None

# Подготовка списка для DataFrames
dfs = []

# Чтение и обработка каждого файла
for file in csv_files:
try:
logger.info(f"Обработка файла: {file}")
df = pd.read_csv(file)

# Выбор определенных колонок, если указано
if columns:
available_cols = [col for col in columns if col in df.columns]
if available_cols:
df = df[available_cols]
else:
logger.warning(f"Ни одна из указанных колонок не найдена в {file}")
continue

# Добавление метаданных о файле, если требуется
if add_metadata:
df['source_file'] = os.path.basename(file)
df['import_timestamp'] = datetime.now()

# Применение фильтра, если указан
if filter_condition:
try:
col, op, val = filter_condition.replace('>', '|').replace('<', '|').replace('=', '|').split('|')
col = col.strip()
val = val.strip()

if op == '>':
df = df[df[col] > float(val)]
elif op == '<':
df = df[df[col] < float(val)]
elif op == '==':
df = df[df[col] == val]

logger.info(f"Применен фильтр: {filter_condition}. Осталось {len(df)} строк")
except Exception as e:
logger.error(f"Ошибка при применении фильтра: {e}")

dfs.append(df)

except Exception as e:
logger.error(f"Ошибка при обработке файла {file}: {e}")

if not dfs:
logger.error("Не удалось обработать ни один файл")
return None

# Объединение всех DataFrames
logger.info("Объединение всех DataFrame'ов")
combined_df = pd.concat(dfs, ignore_index=True)

# Сортировка по указанной колонке
if sort_by and sort_by in combined_df.columns:
logger.info(f"Сортировка по колонке {sort_by}")
combined_df = combined_df.sort_values(by=sort_by)

# Сохранение результата
logger.info(f"Сохранение результата в {output_file}")
combined_df.to_csv(output_file, index=False)

end_time = datetime.now()
execution_time = (end_time – start_time).total_seconds()
logger.info(f"Объединение завершено за {execution_time} секунд")
logger.info(f"Итоговый DataFrame: {len(combined_df)} строк, {len(combined_df.columns)} колонок")

return combined_df

def main():
# Создание парсера аргументов командной строки
parser = argparse.ArgumentParser(description='Объединение CSV-файлов в один')
parser.add_argument('--input', '-i', required=True, 
help='Путь к директории с CSV-файлами или шаблон glob')
parser.add_argument('--output', '-o', required=True, 
help='Путь для сохранения результата')
parser.add_argument('--columns', '-c', nargs='+', default=None,
help='Список колонок для выбора')
parser.add_argument('--sort', '-s', default=None,
help='Колонка для сортировки результата')
parser.add_argument('--filter', '-f', default=None,
help='Условие фильтрации в формате "column>value"')
parser.add_argument('--add-metadata', '-m', action='store_true',
help='Добавлять метаданные о файлах-источниках')

args = parser.parse_args()

# Запуск функции объединения с указанными параметрами
combine_csv_files(
input_path=args.input,
output_file=args.output,
columns=args.columns,
sort_by=args.sort,
filter_condition=args.filter,
add_metadata=args.add_metadata
)

if __name__ == "__main__":
main()

Такой скрипт можно запускать из командной строки с различными параметрами:

# Пример запуска скрипта
python csv_combiner.py --input "data/*.csv" --output "output/combined.csv" --columns id name value --sort id --add-metadata

Для регулярной автоматизации процесса можно использовать планировщики задач, такие как cron в Linux или Task Scheduler в Windows:

# Пример cron-задания для запуска ежедневно в 2:00 ночи
0 2 * * * /usr/bin/python3 /path/to/csv_combiner.py --input "/data/daily/*.csv" --output "/data/combined/daily_$(date +\%Y\%m\%d).csv" --add-metadata

Для еще более продвинутой автоматизации можно интегрировать скрипт с системами оркестрации рабочих процессов, такими как Apache Airflow:

Python
Скопировать код
# Пример DAG для Airflow (сохранить в dags/csv_combine_dag.py)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/path/to/scripts')
from csv_combiner import combine_csv_files

default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email': ['data_alerts@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'combine_csv_files',
default_args=default_args,
description='Ежедневное объединение CSV-файлов',
schedule_interval='0 2 * * *',
)

combine_task = PythonOperator(
task_id='combine_csv_files',
python_callable=combine_csv_files,
op_kwargs={
'input_path': '/data/daily/*.csv',
'output_file': f'/data/combined/daily_{datetime.now().strftime("%Y%m%d")}.csv',
'add_metadata': True
},
dag=dag,
)

Расширение возможностей автоматизации может включать:

  • Отправку уведомлений по электронной почте о результатах обработки
  • Интеграцию с системами мониторинга для отслеживания качества данных
  • Настройку обработки ошибок и повторных попыток
  • Добавление шагов предварительной и последующей обработки (валидация, трансформация)
  • Параллельную обработку файлов для ускорения работы с большими объемами данных

Автоматизация объединения CSV-файлов превращает рутинную задачу в надежный, воспроизводимый процесс, который может работать без вмешательства человека и обрабатывать данные по мере их поступления. Это особенно важно для систем, где данные обновляются регулярно и требуют постоянного анализа. 🤖

Освоив техники объединения CSV-файлов в единый DataFrame, вы получили мощный инструмент для работы с разрозненными источниками данных. Ключевой момент эффективного использования pandas — баланс между правильной подготовкой среды, выбором оптимальных методов объединения и автоматизацией рутинных процессов. Помните, что экономия времени на автоматизации ручного труда возвращается сторицей в виде точности анализа и возможности сосредоточиться на интерпретации результатов, а не на механической подготовке данных.

Загрузка...