Как объединить CSV-файлы в один DataFrame: инструкция для Python
Для кого эта статья:
- Аналитики данных и специалисты по обработке данных
- Студенты и начинающие профессионалы в области анализа данных
Специалисты, ищущие эффективные решения для обработки и объединения больших объемов данных
Работая с данными, аналитик часто сталкивается с необходимостью объединить информацию из десятков, а иногда и сотен CSV-файлов. Ручное копирование — путь к ошибкам и потере времени. Эффективное решение этой задачи с помощью Python и библиотеки pandas не только экономит ресурсы, но и открывает новые возможности для анализа. Правильно настроенный процесс объединения данных позволяет за секунды собрать разрозненную информацию в единый DataFrame, готовый к дальнейшей обработке. 🚀
Хотите освоить профессиональные техники работы с данными? Программа Профессия аналитик данных от Skypro научит вас не только объединять CSV-файлы, но и строить полноценные аналитические пайплайны. Вы освоите продвинутые возможности pandas, научитесь трансформировать и визуализировать данные, а главное — превращать сырые цифры в инсайты для бизнеса. Более 87% выпускников находят работу в первые три месяца после обучения.
Подготовка среды для объединения CSV-файлов в pandas
Перед тем как приступить к объединению CSV-файлов, необходимо правильно настроить рабочее окружение. Это фундамент, на котором строится весь последующий процесс обработки данных.
Начнем с установки необходимых библиотек. Если вы еще не установили pandas, выполните следующую команду:
pip install pandas
Для работы с файловой системой и поиска CSV-файлов по шаблону нам понадобится стандартный модуль glob:
import pandas as pd
import glob
import os
Структурирование проекта — важный этап подготовки. Рекомендую организовать файловую структуру следующим образом:
- data/ — директория с исходными CSV-файлами
- output/ — директория для сохранения результатов
- scripts/ — директория с Python-скриптами
Проверка наличия и доступа к файлам — обязательный шаг, который поможет избежать ошибок в процессе объединения:
csv_files = glob.glob('data/*.csv')
print(f'Найдено файлов: {len(csv_files)}')
for file in csv_files[:5]: # Вывод первых 5 файлов для проверки
print(f'- {file}')
Александр Петров, Lead Data Scientist
Однажды мне пришлось объединить данные о продажах из 47 региональных отделений компании. Каждый офис высылал еженедельный отчет в формате CSV. Чтобы не тратить каждый понедельник на ручное объединение, я потратил час на настройку среды и написание скрипта. В итоге процесс, который занимал 3 часа еженедельно, сократился до 10 секунд автоматической работы. Ключевым моментом стала правильная настройка директории с файлами и стандартизация названий входящих отчетов. Я создал шаблон "salesreportYYYYMMDD.csv" и обязал все отделения следовать ему, что значительно упростило автоматизацию.
Перед объединением необходимо проверить совместимость данных. Полезно просмотреть структуру нескольких файлов:
# Проверка структуры первого файла
if csv_files:
sample_df = pd.read_csv(csv_files[0])
print("Структура первого файла:")
print(sample_df.info())
print("\nПервые 5 строк:")
print(sample_df.head())
Настройка параметров чтения файлов также важна для правильного объединения:
| Параметр | Описание | Пример использования |
|---|---|---|
| delimiter | Разделитель в CSV-файле | pd.read_csv('file.csv', delimiter=';') |
| encoding | Кодировка файла | pd.read_csv('file.csv', encoding='utf-8') |
| dtype | Типы данных колонок | pd.read_csv('file.csv', dtype={'ID': str}) |
| parse_dates | Колонки для обработки как даты | pd.readcsv('file.csv', parsedates=['Date']) |
| na_values | Значения, которые считать как NaN | pd.readcsv('file.csv', navalues=['-', 'NA']) |
Создав правильную основу, мы готовы перейти к непосредственному объединению файлов. 🛠️

Основные методы объединения CSV в единый DataFrame
Существует несколько ключевых подходов к объединению CSV-файлов в pandas, каждый из которых имеет свои преимущества в зависимости от задачи и структуры данных.
Самый распространенный метод — использование функции pd.concat() для последовательного объединения DataFrame'ов:
# Базовый пример объединения с помощью pd.concat()
dfs = [] # список для хранения DataFrame'ов
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
dfs.append(df)
# Объединение всех DataFrame'ов в один
combined_df = pd.concat(dfs, ignore_index=True)
print(f"Итоговый DataFrame содержит {combined_df.shape[0]} строк и {combined_df.shape[1]} колонок")
Параметры функции pd.concat() существенно влияют на результат объединения:
- ignore_index=True — сбрасывает индексы исходных DataFrame'ов и создает новую непрерывную индексацию
- axis=0 (по умолчанию) — объединение "по вертикали", добавляя строки
- axis=1 — объединение "по горизонтали", добавляя новые колонки
- join='outer' (по умолчанию) — включает все строки/колонки из всех DataFrame'ов
- join='inner' — включает только строки/колонки, присутствующие во всех DataFrame'ах
Альтернативный метод — чтение и объединение файлов "на лету" без промежуточного хранения в списке:
# Объединение "на лету" без использования промежуточного списка
combined_df = pd.concat(
(pd.read_csv(file) for file in glob.glob('data/*.csv')),
ignore_index=True
)
Для более сложных сценариев можно использовать метод merge(), который позволяет объединять DataFrame'ы по определенным ключам, аналогично SQL-соединениям:
# Пример объединения с использованием merge()
base_df = pd.read_csv('data/main.csv')
for file in glob.glob('data/additional_*.csv'):
additional_df = pd.read_csv(file)
# Объединение по ключевому полю 'id'
base_df = pd.merge(
base_df,
additional_df,
on='id',
how='left' # 'left', 'right', 'inner', 'outer'
)
Для более специфических задач можно использовать функцию append(), хотя она постепенно устаревает в пользу concat():
# Использование append() (deprecated в новых версиях pandas)
result_df = pd.DataFrame()
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
result_df = result_df.append(df, ignore_index=True)
| Метод | Преимущества | Недостатки | Использовать, когда... |
|---|---|---|---|
| pd.concat() | Гибкий, эффективный, работает с любым количеством DataFrame'ов | Может потреблять много памяти при большом числе файлов | У всех файлов одинаковая или похожая структура |
| pd.merge() | Точное управление объединением по ключам | Сложнее в использовании, медленнее для простых задач | Необходимо объединение по определенным полям (как SQL JOIN) |
| append() | Прост в использовании | Устарел в новых версиях, менее эффективен | Работа с устаревшим кодом |
| read_csv + аргументы | Одна операция без промежуточных шагов | Применим только для идентичных файлов | Файлы находятся в одной директории и имеют одинаковый формат |
Выбор метода объединения зависит от конкретной задачи, объема данных и структуры файлов. Для большинства стандартных задач оптимальным решением является pd.concat() с правильно подобранными параметрами. 📊
Объединение файлов с разной структурой в pandas
В реальных проектах часто приходится сталкиваться с ситуацией, когда CSV-файлы, которые нужно объединить, имеют различную структуру. Это может проявляться в разных наборах колонок, несовпадающих типах данных или различных форматах представления информации.
Первый шаг при работе с файлами разной структуры — анализ различий между ними:
# Анализ структур разных CSV-файлов
structures = {}
for file in glob.glob('data/*.csv'):
filename = os.path.basename(file)
df = pd.read_csv(file)
structures[filename] = {
'columns': list(df.columns),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'rows': len(df)
}
# Найти общие колонки во всех файлах
all_columns = set()
for file_info in structures.values():
all_columns.update(file_info['columns'])
common_columns = set.intersection(*[set(file_info['columns']) for file_info in structures.values()])
print(f"Всего уникальных колонок: {len(all_columns)}")
print(f"Общих колонок во всех файлах: {len(common_columns)}")
print(f"Общие колонки: {common_columns}")
После анализа структуры файлов можно применить один из следующих подходов:
- Объединение только по общим колонкам — подходит, когда важны только те данные, которые присутствуют во всех файлах
- Объединение с заполнением отсутствующих колонок — сохраняет все данные, заполняя пропуски значениями NaN
- Нормализация данных перед объединением — преобразование каждого файла к единой структуре
Вот пример объединения файлов с разными наборами колонок, с сохранением всех уникальных колонок:
# Объединение файлов с разной структурой
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
# Добавим информацию об источнике данных
df['source_file'] = os.path.basename(file)
dfs.append(df)
# Объединение с outer join для сохранения всех колонок
combined_df = pd.concat(dfs, ignore_index=True, sort=False, join='outer')
# Проверка количества пропущенных значений в каждой колонке
missing_values = combined_df.isnull().sum()
print("Количество пропущенных значений по колонкам:")
print(missing_values[missing_values > 0])
Для решения проблемы несовпадающих типов данных можно использовать предварительное приведение к общему типу:
# Приведение колонок к общему типу данных
def standardize_dataframe(df, column_types):
for col, dtype in column_types.items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except:
# Если не удалось преобразовать, оставляем как есть
pass
return df
# Определим целевые типы данных для колонок
target_types = {
'id': 'int64',
'name': 'str',
'date': 'datetime64[ns]',
'value': 'float64'
}
# Обработка каждого файла
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
df = standardize_dataframe(df, target_types)
dfs.append(df)
combined_df = pd.concat(dfs, ignore_index=True)
Иногда требуется более глубокая трансформация данных перед объединением, например, переименование или объединение колонок:
# Пример более сложной предобработки
def preprocess_dataframe(df, file_path):
filename = os.path.basename(file_path)
# Переименование колонок в зависимости от источника
if 'old_report' in filename:
# В старых отчетах колонка называется 'client_id'
if 'client_id' in df.columns:
df = df.rename(columns={'client_id': 'customer_id'})
# Преобразование формата даты
if 'date' in df.columns:
try:
df['date'] = pd.to_datetime(df['date'])
except:
# Если не удалось автоматически, пробуем разные форматы
try:
df['date'] = pd.to_datetime(df['date'], format='%d.%m.%Y')
except:
# Если всё ещё не удалось, помечаем как проблемные
df['date_error'] = True
return df
# Применение предобработки к каждому файлу
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
df = preprocess_dataframe(df, file)
dfs.append(df)
combined_df = pd.concat(dfs, ignore_index=True)
Мария Соколова, Data Engineer
В одном из проектов по анализу логов пользовательской активности я столкнулась с необходимостью объединить более 300 CSV-файлов, собранных с разных серверов за квартал. Проблема была в том, что в зависимости от версии ПО, установленного на серверах, формат логов различался: в некоторых файлах IP-адреса хранились в одной колонке, в других были разделены на четыре компонента, даты имели разный формат, а некоторые поля присутствовали не во всех файлах.
Вместо ручной стандартизации каждого файла я создала "карту преобразований" – словарь Python, который определял правила нормализации для каждой версии лога. Перед объединением скрипт определял версию формата по характерным признакам и применял соответствующие трансформации. Это позволило автоматически обрабатывать даже те файлы, которые появились уже после запуска проекта. Трансформация включала приведение дат к единому формату, склеивание/разделение IP-адресов и генерацию синтетических признаков для заполнения отсутствующей информации.
Работа с файлами разной структуры требует баланса между сохранением данных и получением согласованного результата. Тщательная предварительная обработка позволяет избежать проблем на этапе анализа объединенных данных. 🔄
Оптимизация процесса обработки больших CSV-файлов
При работе с большими объемами данных стандартный подход к объединению CSV-файлов может привести к проблемам с производительностью или даже к сбоям из-за нехватки оперативной памяти. Рассмотрим методы оптимизации, которые помогут эффективно работать с большими датасетами.
Первое, что следует учитывать — чтение только необходимых колонок вместо загрузки всего файла:
# Чтение только нужных колонок
required_columns = ['id', 'timestamp', 'value']
dfs = []
for file in glob.glob('data/*.csv'):
# usecols позволяет загрузить только указанные колонки
df = pd.read_csv(file, usecols=required_columns)
dfs.append(df)
combined_df = pd.concat(dfs, ignore_index=True)
Для очень больших файлов эффективно использовать порционное чтение с параметром chunksize:
# Обработка файлов по частям
combined_df = pd.DataFrame()
for file in glob.glob('data/*.csv'):
# Чтение файла порциями по 100,000 строк
for chunk in pd.read_csv(file, chunksize=100000):
# Здесь можно выполнить предварительную обработку чанка
combined_df = pd.concat([combined_df, chunk], ignore_index=True)
# Опционально: сохранение промежуточных результатов
combined_df.to_csv('output/intermediate_result.csv', index=False)
print(f"Файл {file} обработан, всего строк: {len(combined_df)}")
Оптимизация типов данных может значительно сократить использование памяти:
# Оптимизация типов данных
def optimize_df_types(df):
# Оптимизация целочисленных колонок
for col in df.select_dtypes(include=['int']).columns:
col_min = df[col].min()
col_max = df[col].max()
if col_min >= 0:
if col_max < 255:
df[col] = df[col].astype('uint8')
elif col_max < 65535:
df[col] = df[col].astype('uint16')
elif col_max < 4294967295:
df[col] = df[col].astype('uint32')
else:
df[col] = df[col].astype('uint64')
else:
if col_min > -128 and col_max < 127:
df[col] = df[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype('int32')
# Оптимизация колонок с плавающей точкой
for col in df.select_dtypes(include=['float']).columns:
df[col] = df[col].astype('float32')
# Оптимизация строковых колонок
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() < len(df[col]) * 0.5: # Если много повторяющихся значений
df[col] = df[col].astype('category')
return df
# Применение оптимизации к каждому файлу
dfs = []
for file in glob.glob('data/*.csv'):
df = pd.read_csv(file)
df = optimize_df_types(df)
dfs.append(df)
combined_df = pd.concat(dfs, ignore_index=True)
# Применим оптимизацию и к итоговому DataFrame
combined_df = optimize_df_types(combined_df)
# Проверка использования памяти
print(f"Использование памяти: {combined_df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
Для сверхбольших объемов данных может потребоваться использование специализированных библиотек:
- dask — параллельные вычисления для больших датасетов, с API, аналогичным pandas
- vaex — обработка табличных данных для объектов, превышающих объем оперативной памяти
- modin — ускорение pandas с минимальными изменениями кода
Пример использования dask для объединения больших CSV-файлов:
# Установка: pip install dask[dataframe]
import dask.dataframe as dd
# Загрузка данных с помощью dask
ddf = dd.read_csv('data/*.csv')
# Выполнение операций аналогично pandas
result = ddf.groupby('category')['value'].mean().compute()
# Сохранение результата
ddf.to_csv('output/combined_data_*.csv') # Создаст несколько файлов с партициями данных
Сравнение эффективности различных подходов:
| Метод | Преимущества | Ограничения | Применение |
|---|---|---|---|
| Стандартный pandas | Простота использования, полная функциональность | Ограничен объемом RAM | Файлы до 1-2 GB в сумме |
| Чтение по частям (chunksize) | Работает с ограниченной памятью | Медленнее, сложнее код | Файлы от 2 до 10 GB |
| Оптимизация типов | Может снизить использование памяти в 2-5 раз | Требует анализа данных | Любые объемы, как дополнительная оптимизация |
| Dask | Параллельная обработка, поддержка кластеров | Не все функции pandas доступны | Файлы от 10 GB и больше |
| Spark (PySpark) | Распределенная обработка, высокая масштабируемость | Сложность настройки, другой синтаксис | Терабайты данных, корпоративные задачи |
При обработке больших объемов данных стоит также рассмотреть сохранение промежуточных результатов в более эффективных форматах, таких как Parquet или Feather:
# Сохранение в эффективном формате
combined_df.to_parquet('output/combined_data.parquet')
# При последующей загрузке данные будут загружены быстрее
df = pd.read_parquet('output/combined_data.parquet')
Выбор правильного метода оптимизации зависит от конкретной задачи, объема данных и доступных ресурсов. Для большинства практических случаев сочетание правильного выбора колонок, оптимизации типов данных и чтения по частям дает хороший баланс между простотой реализации и эффективностью. 💾
Автоматизация объединения CSV с помощью Python-скриптов
Автоматизация процесса объединения CSV-файлов позволяет не только сэкономить время, но и создать повторяемый, надежный рабочий процесс, который можно запускать по расписанию или по мере поступления новых данных.
Рассмотрим создание полноценного Python-скрипта, который будет автоматически объединять CSV-файлы из указанной директории с возможностью настройки различных параметров:
#!/usr/bin/env python3
# csv_combiner.py
import pandas as pd
import glob
import os
import argparse
import logging
from datetime import datetime
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s – %(levelname)s – %(message)s',
handlers=[
logging.FileHandler("csv_combiner.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger()
def combine_csv_files(input_path, output_file, columns=None, sort_by=None,
filter_condition=None, add_metadata=False):
"""
Объединяет CSV-файлы из указанной директории.
Args:
input_path (str): Путь к директории с CSV-файлами или шаблон glob
output_file (str): Путь для сохранения результата
columns (list): Список колонок для выбора (None = все)
sort_by (str): Колонка для сортировки результата
filter_condition (str): Условие фильтрации в формате 'column>value'
add_metadata (bool): Добавлять ли метаданные о файлах-источниках
Returns:
pd.DataFrame: Объединенный DataFrame
"""
start_time = datetime.now()
logger.info(f"Начало объединения файлов из {input_path}")
# Находим все файлы по шаблону
csv_files = glob.glob(input_path)
logger.info(f"Найдено {len(csv_files)} CSV-файлов")
if len(csv_files) == 0:
logger.error(f"Не найдено CSV-файлов по пути {input_path}")
return None
# Подготовка списка для DataFrames
dfs = []
# Чтение и обработка каждого файла
for file in csv_files:
try:
logger.info(f"Обработка файла: {file}")
df = pd.read_csv(file)
# Выбор определенных колонок, если указано
if columns:
available_cols = [col for col in columns if col in df.columns]
if available_cols:
df = df[available_cols]
else:
logger.warning(f"Ни одна из указанных колонок не найдена в {file}")
continue
# Добавление метаданных о файле, если требуется
if add_metadata:
df['source_file'] = os.path.basename(file)
df['import_timestamp'] = datetime.now()
# Применение фильтра, если указан
if filter_condition:
try:
col, op, val = filter_condition.replace('>', '|').replace('<', '|').replace('=', '|').split('|')
col = col.strip()
val = val.strip()
if op == '>':
df = df[df[col] > float(val)]
elif op == '<':
df = df[df[col] < float(val)]
elif op == '==':
df = df[df[col] == val]
logger.info(f"Применен фильтр: {filter_condition}. Осталось {len(df)} строк")
except Exception as e:
logger.error(f"Ошибка при применении фильтра: {e}")
dfs.append(df)
except Exception as e:
logger.error(f"Ошибка при обработке файла {file}: {e}")
if not dfs:
logger.error("Не удалось обработать ни один файл")
return None
# Объединение всех DataFrames
logger.info("Объединение всех DataFrame'ов")
combined_df = pd.concat(dfs, ignore_index=True)
# Сортировка по указанной колонке
if sort_by and sort_by in combined_df.columns:
logger.info(f"Сортировка по колонке {sort_by}")
combined_df = combined_df.sort_values(by=sort_by)
# Сохранение результата
logger.info(f"Сохранение результата в {output_file}")
combined_df.to_csv(output_file, index=False)
end_time = datetime.now()
execution_time = (end_time – start_time).total_seconds()
logger.info(f"Объединение завершено за {execution_time} секунд")
logger.info(f"Итоговый DataFrame: {len(combined_df)} строк, {len(combined_df.columns)} колонок")
return combined_df
def main():
# Создание парсера аргументов командной строки
parser = argparse.ArgumentParser(description='Объединение CSV-файлов в один')
parser.add_argument('--input', '-i', required=True,
help='Путь к директории с CSV-файлами или шаблон glob')
parser.add_argument('--output', '-o', required=True,
help='Путь для сохранения результата')
parser.add_argument('--columns', '-c', nargs='+', default=None,
help='Список колонок для выбора')
parser.add_argument('--sort', '-s', default=None,
help='Колонка для сортировки результата')
parser.add_argument('--filter', '-f', default=None,
help='Условие фильтрации в формате "column>value"')
parser.add_argument('--add-metadata', '-m', action='store_true',
help='Добавлять метаданные о файлах-источниках')
args = parser.parse_args()
# Запуск функции объединения с указанными параметрами
combine_csv_files(
input_path=args.input,
output_file=args.output,
columns=args.columns,
sort_by=args.sort,
filter_condition=args.filter,
add_metadata=args.add_metadata
)
if __name__ == "__main__":
main()
Такой скрипт можно запускать из командной строки с различными параметрами:
# Пример запуска скрипта
python csv_combiner.py --input "data/*.csv" --output "output/combined.csv" --columns id name value --sort id --add-metadata
Для регулярной автоматизации процесса можно использовать планировщики задач, такие как cron в Linux или Task Scheduler в Windows:
# Пример cron-задания для запуска ежедневно в 2:00 ночи
0 2 * * * /usr/bin/python3 /path/to/csv_combiner.py --input "/data/daily/*.csv" --output "/data/combined/daily_$(date +\%Y\%m\%d).csv" --add-metadata
Для еще более продвинутой автоматизации можно интегрировать скрипт с системами оркестрации рабочих процессов, такими как Apache Airflow:
# Пример DAG для Airflow (сохранить в dags/csv_combine_dag.py)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/path/to/scripts')
from csv_combiner import combine_csv_files
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email': ['data_alerts@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'combine_csv_files',
default_args=default_args,
description='Ежедневное объединение CSV-файлов',
schedule_interval='0 2 * * *',
)
combine_task = PythonOperator(
task_id='combine_csv_files',
python_callable=combine_csv_files,
op_kwargs={
'input_path': '/data/daily/*.csv',
'output_file': f'/data/combined/daily_{datetime.now().strftime("%Y%m%d")}.csv',
'add_metadata': True
},
dag=dag,
)
Расширение возможностей автоматизации может включать:
- Отправку уведомлений по электронной почте о результатах обработки
- Интеграцию с системами мониторинга для отслеживания качества данных
- Настройку обработки ошибок и повторных попыток
- Добавление шагов предварительной и последующей обработки (валидация, трансформация)
- Параллельную обработку файлов для ускорения работы с большими объемами данных
Автоматизация объединения CSV-файлов превращает рутинную задачу в надежный, воспроизводимый процесс, который может работать без вмешательства человека и обрабатывать данные по мере их поступления. Это особенно важно для систем, где данные обновляются регулярно и требуют постоянного анализа. 🤖
Освоив техники объединения CSV-файлов в единый DataFrame, вы получили мощный инструмент для работы с разрозненными источниками данных. Ключевой момент эффективного использования pandas — баланс между правильной подготовкой среды, выбором оптимальных методов объединения и автоматизацией рутинных процессов. Помните, что экономия времени на автоматизации ручного труда возвращается сторицей в виде точности анализа и возможности сосредоточиться на интерпретации результатов, а не на механической подготовке данных.