5 методов параллелизма в Python: ускорение кода на многоядерных CPU
Для кого эта статья:
- Разработчики на Python, заинтересованные в оптимизации производительности своих скриптов
- Специалисты по анализу данных и машинному обучению, работающие с большими объемами данных
Студенты и начинающие программисты, стремящиеся повысить свои навыки в параллельном программировании
Когда ваш Python-скрипт обрабатывает данные со скоростью улитки, а дедлайн горит — время подумать о параллельных вычислениях. Я собрал пять проверенных методов, которые превратят ваш одноядерный код в многопоточную машину производительности. От классического threading до современного asyncio — эти инструменты способны ускорить ваши вычисления в разы, если применить их правильно. Готовы наконец раскрыть весь потенциал ваших многоядерных процессоров? 🚀
Если вы хотите не просто скопировать код из статьи, а по-настоящему освоить параллельное программирование на Python, стоит задуматься о системном обучении. Обучение Python-разработке от Skypro включает продвинутые техники оптимизации кода и параллельных вычислений. Через полгода вы сможете превращать любой последовательный скрипт в эффективное многопоточное приложение, оптимизируя работу с данными любого масштаба.
Почему Python нуждается в параллельных вычислениях
Python прекрасен своей простотой и элегантностью, но у этой медали есть обратная сторона — печально известный GIL (Global Interpreter Lock). Эта архитектурная особенность, словно строгий охранник, не позволяет нескольким потокам Python одновременно выполнять байт-код. В результате многопоточное приложение на Python часто работает не быстрее однопоточного — парадокс, не правда ли? 😕
При этом современные процессоры давно перешагнули эпоху одноядерности. Даже бюджетные смартфоны сегодня оснащаются 4-8 ядрами, а серверы могут похвастаться десятками физических ядер. Не использовать эту мощь — значит добровольно отказываться от огромного вычислительного потенциала.
Алексей Иванов, ведущий инженер по высокопроизводительным вычислениям
В 2021 году мы столкнулись с неожиданной проблемой — наш сервис анализа геопространственных данных начал задыхаться под нагрузкой. Каждый день обрабатывалось около 2 ТБ спутниковых снимков, и последовательный код просто не справлялся. Клиенты жаловались на задержки до 12 часов. Внедрение параллелизма через multiprocessing и joblib сократило время обработки до 40 минут. Но настоящий прорыв случился, когда мы перешли на Dask для распределенных вычислений — время обработки упало до 7 минут. Эта оптимизация не только сохранила нам клиентов, но и позволила масштабировать бизнес на новые регионы без увеличения инфраструктурных затрат.
Существует несколько сценариев, когда параллельные вычисления на Python становятся не роскошью, а необходимостью:
- CPU-интенсивные задачи: обработка изображений, машинное обучение, симуляции
- I/O-интенсивные операции: работа с сетью, файлами, базами данных
- Обработка больших объемов данных: когда данные не помещаются в память одного процесса
- Real-time системы: когда требуется одновременно обрабатывать множество событий
Давайте рассмотрим, какие инструменты предлагает Python для параллельного программирования и как их эффективно применять. 🧰
| Тип задач | Рекомендуемый подход | Примеры задач |
|---|---|---|
| CPU-bound | multiprocessing, joblib | Машинное обучение, обработка изображений, сложные вычисления |
| I/O-bound | threading, asyncio | Сетевые запросы, работа с файлами, API-вызовы |
| Смешанные | concurrent.futures + комбинированные подходы | Веб-скрейпинг с обработкой данных, ETL-процессы |
| Распределенные | Dask, Ray | Big Data аналитика, распределенное машинное обучение |

Многопоточность через threading: возможности и ограничения
Модуль threading — самый старый и наиболее знакомый инструмент для параллелизма в Python. Он позволяет создавать потоки, которые выполняются параллельно в рамках одного процесса. Но вспомним о GIL: он превращает многопоточность в Python в нечто похожее на псевдопараллелизм. 🧵
Однако не спешите списывать threading со счетов. Для I/O-bound задач, где большую часть времени программа ожидает завершения внешних операций (чтение файлов, сетевые запросы, запросы к базам данных), threading может дать впечатляющий прирост производительности.
Вот простой пример использования threading для параллельной загрузки веб-страниц:
import threading
import requests
import time
urls = [
"https://www.python.org/",
"https://docs.python.org/",
"https://pypi.org/",
"https://github.com/python/cpython",
"https://wiki.python.org/moin/"
] * 5 # Умножаем для большего количества запросов
def download(url):
response = requests.get(url)
print(f"Downloaded {url}, status code: {response.status_code}, length: {len(response.text)}")
# Последовательное выполнение
start_time = time.time()
for url in urls:
download(url)
print(f"Sequential execution took: {time.time() – start_time:.2f} seconds\n")
# Параллельное выполнение через threading
start_time = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=download, args=(url,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"Threaded execution took: {time.time() – start_time:.2f} seconds")
При выполнении этого кода на типичной задаче с 25 URL, многопоточная версия может быть в 10-15 раз быстрее последовательной. Однако у threading есть ряд ограничений, которые следует учитывать:
- GIL блокирует истинный параллелизм для CPU-bound задач
- Работа с общими данными требует синхронизации (locks, semaphores)
- Отладка многопоточных программ может быть сложной из-за недетерминированного выполнения
- Слишком большое количество потоков создает накладные расходы на переключение контекста
Threading лучше всего использовать в следующих сценариях:
- Сетевые операции и API-запросы
- Файловые операции, особенно с медленными накопителями
- Задачи, где присутствует длительное ожидание (waiters, timers)
- Пользовательские интерфейсы, где требуется отзывчивость
Важно помнить: потоки разделяют память процесса, что облегчает обмен данными, но создает риск гонок данных (data races). Использование блокировок и примитивов синхронизации критически важно для безопасной работы с общими ресурсами. 🔒
Многопроцессность с multiprocessing: масштабируем вычисления
Когда ограничения GIL становятся непреодолимым барьером, на сцену выходит модуль multiprocessing. В отличие от threading, он создает отдельные процессы Python, каждый со своим интерпретатором и, следовательно, со своим GIL. Это позволяет по-настоящему задействовать все ядра процессора для CPU-интенсивных задач. 🔄
Синтаксически multiprocessing напоминает threading, что облегчает переход между этими библиотеками:
import multiprocessing
import time
def cpu_intensive_task(number):
"""Функция, интенсивно использующая CPU"""
result = 0
for i in range(10**7): # 10 миллионов операций
result += i * number
return result
# Последовательное выполнение
start_time = time.time()
results = [cpu_intensive_task(i) for i in range(8)]
print(f"Sequential execution took: {time.time() – start_time:.2f} seconds")
# Параллельное выполнение через multiprocessing
start_time = time.time()
with multiprocessing.Pool(processes=8) as pool:
results = pool.map(cpu_intensive_task, range(8))
print(f"Multiprocessing execution took: {time.time() – start_time:.2f} seconds")
На многоядерном процессоре этот код может продемонстрировать ускорение, близкое к линейному относительно числа ядер. Например, на 8-ядерном CPU можно получить 6-7-кратное ускорение по сравнению с последовательным выполнением. 🚀
Марина Соколова, технический лид в области анализа данных
В нашем проекте по генетическом анализу мы обрабатывали геномные последовательности размером до 500 ГБ. Изначально я использовала threading для параллелизации, не понимая, почему скорость обработки практически не улучшалась. После профилирования стало очевидно, что 95% времени уходит на вычисления, а не на I/O. Замена threading на multiprocessing с правильной стратегией разделения данных дала ускорение в 22 раза на нашем 24-ядерном сервере. Интересно, что нам пришлось решать нетривиальную проблему: при обмене большими массивами данных между процессами стандартный pickle достигал своих лимитов. Переход на dill и оптимизация передачи данных через разделяемую память (shared memory) позволили избежать узких мест. Сейчас анализ, который раньше занимал 14 часов, выполняется за 35-40 минут.
Однако multiprocessing имеет свои особенности и ограничения:
- Накладные расходы на создание процессов — процессы требуют больше ресурсов, чем потоки
- Обмен данными между процессами происходит через сериализацию/десериализацию (pickle)
- Разделяемые ресурсы требуют специальных механизмов (Value, Array, Manager)
- Не все объекты можно сериализовать для передачи между процессами
Для эффективного использования multiprocessing рекомендую следующие практики:
- Использовать пулы процессов (Pool) для управления рабочими процессами
- Минимизировать передачу данных между процессами
- Для больших объемов данных применять shared memory или memmap
- Учитывать overhead на создание процессов — оно должно окупаться временем вычислений
Модуль multiprocessing предоставляет несколько способов организации параллельных вычислений:
| Метод | Описание | Применимость |
|---|---|---|
| Pool.map() | Параллельная обработка итерируемого объекта | Идеально для map/reduce операций |
| Pool.apply_async() | Асинхронный запуск функций | Когда нужно запускать задачи динамически |
| Process | Ручное управление процессами | Сложная логика с кастомной координацией |
| Queue/Pipe | Обмен сообщениями между процессами | Производитель-потребитель паттерны |
| Value/Array | Разделяемые переменные | Атомарные операции с общими данными |
Высокоуровневые абстракции concurrent.futures
Если вам нужен более чистый и современный API для работы с параллелизмом, обратите внимание на модуль concurrent.futures. Он предоставляет высокоуровневый интерфейс как для многопоточности (ThreadPoolExecutor), так и для многопроцессорности (ProcessPoolExecutor). 👨💻
Главное преимущество concurrent.futures — унифицированный интерфейс, который абстрагирует детали реализации параллелизма и позволяет легко переключаться между потоками и процессами:
import concurrent.futures
import time
import requests
urls = [
"https://www.python.org/",
"https://docs.python.org/",
"https://pypi.org/",
"https://github.com/python/cpython",
"https://wiki.python.org/moin/"
] * 5
def download(url):
response = requests.get(url)
return f"Downloaded {url}, status: {response.status_code}, length: {len(response.text)}"
# С использованием ThreadPoolExecutor (для I/O-bound задач)
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(download, urls)
for result in results:
print(result)
print(f"ThreadPoolExecutor took: {time.time() – start_time:.2f} seconds\n")
# Функция для CPU-bound задачи
def calculate(number):
return sum(i * i for i in range(number))
numbers = [20000000, 25000000, 30000000, 35000000, 40000000]
# С использованием ProcessPoolExecutor (для CPU-bound задач)
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(calculate, numbers)
for result in list(results):
print(f"Sum calculated: {result}")
print(f"ProcessPoolExecutor took: {time.time() – start_time:.2f} seconds")
Преимущества concurrent.futures:
- Унифицированный API для потоков и процессов
- Встроенный пул исполнителей с автоматическим управлением ресурсами
- Возможность получать результаты через Future объекты
- Удобные методы map() и submit() для разных паттернов параллелизма
- Контекстные менеджеры для безопасного освобождения ресурсов
Особенно полезной является возможность использовать метод as_completed(), который позволяет обрабатывать результаты задач по мере их завершения, а не ждать завершения всех задач:
import concurrent.futures
import time
import random
def task(n):
"""Эмуляция задачи с разным временем выполнения"""
sleep_time = random.uniform(0.5, 3)
time.sleep(sleep_time) # эмуляция работы
return f"Task {n} completed after {sleep_time:.2f} seconds"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Запускаем задачи асинхронно
future_to_task = {executor.submit(task, i): i for i in range(10)}
# Обрабатываем результаты по мере завершения задач
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result()
print(f"Got result from task {task_id}: {result}")
except Exception as e:
print(f"Task {task_id} generated an exception: {e}")
Выбор между ThreadPoolExecutor и ProcessPoolExecutor должен основываться на характере вашей задачи:
- ThreadPoolExecutor: для I/O-bound задач (сеть, файлы, БД)
- ProcessPoolExecutor: для CPU-bound задач (вычисления, обработка данных)
Concurrent.futures — отличный выбор, когда вам нужно быстро реализовать параллельное выполнение без глубокого погружения в детали синхронизации и управления ресурсами. 🎯
Асинхронное программирование: asyncio для неблокирующих задач
С появлением asyncio в Python 3.4 разработчики получили мощный инструмент для конкурентного выполнения задач в рамках одного потока. Это не параллелизм в классическом понимании, а скорее кооперативная многозадачность, где задачи явно сигнализируют о готовности уступить контроль. 🔄
Ключевая идея asyncio — неблокирующие операции. Вместо того чтобы блокировать выполнение при ожидании I/O, программа может переключиться к выполнению других задач:
import asyncio
import aiohttp
import time
async def fetch(session, url):
"""Асинхронная загрузка URL"""
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
"""Загружает все URL параллельно"""
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://www.python.org/",
"https://docs.python.org/",
"https://pypi.org/",
"https://github.com/python/cpython",
"https://wiki.python.org/moin/"
] * 5 # 25 URLs
start_time = time.time()
results = await fetch_all(urls)
elapsed = time.time() – start_time
print(f"Downloaded {len(results)} pages in {elapsed:.2f} seconds")
for url, html in zip(urls, results):
print(f"URL: {url}, Size: {len(html)} bytes")
if __name__ == "__main__":
asyncio.run(main())
Этот код может выполнять десятки или сотни сетевых запросов конкурентно, используя всего один поток. На задаче с загрузкой 25 URL результат может быть впечатляющим — завершение за время, близкое к длительности самого медленного запроса, а не к сумме всех запросов.
Asyncio особенно хорошо подходит для:
- Сетевых серверов с высокой конкурентностью
- Микросервисной архитектуры с множеством межсервисных коммуникаций
- Веб-скрейпинга с большим количеством запросов
- GUI-приложений, требующих отзывчивости при I/O операциях
Важно понимать, что asyncio не ускоряет CPU-bound задачи. Более того, неправильное использование может даже замедлить выполнение программы из-за overhead на управление корутинами.
Ключевые концепции asyncio:
- Корутины (coroutines): функции, объявленные с async def
- Задачи (tasks): обертки вокруг корутин, управляемые циклом событий
- Цикл событий (event loop): сердце asyncio, диспетчеризующее задачи
- await: приостанавливает выполнение корутины до завершения другой корутины
- gather/wait: функции для организации конкурентного выполнения
Важно практическое замечание: для эффективной работы с asyncio вы должны использовать асинхронные библиотеки, такие как aiohttp вместо requests, aiomysql вместо mysql, aiofiles вместо стандартных файловых операций и т.д.
Сравнение asyncio с другими подходами:
| Аспект | Threading | Multiprocessing | Asyncio |
|---|---|---|---|
| Модель конкуренции | Предупредительная | Предупредительная | Кооперативная |
| Ресурсная эффективность | Средняя | Низкая | Высокая |
| Для I/O-bound задач | Хорошо | Избыточно | Отлично |
| Для CPU-bound задач | Плохо (GIL) | Отлично | Плохо |
| Сложность синхронизации | Высокая | Высокая | Низкая |
Специализированные библиотеки для параллельной обработки данных
Помимо стандартных модулей Python, существует ряд специализированных библиотек, которые могут значительно упростить параллельную обработку данных в конкретных сценариях. Эти библиотеки предлагают высокоуровневые абстракции, оптимизированные для определенных типов задач. 📊
Рассмотрим наиболее популярные из них:
1. Joblib — оптимизирован для научных вычислений и задач машинного обучения:
from joblib import Parallel, delayed
import numpy as np
def process_chunk(chunk):
# Эмуляция сложной обработки
return np.mean(np.exp(chunk) * np.sin(chunk))
# Создаем большой массив данных
data = np.random.rand(1000000)
# Разделяем на чанки
chunks = np.array_split(data, 100)
# Параллельная обработка с joblib
results = Parallel(n_jobs=-1)(delayed(process_chunk)(chunk) for chunk in chunks)
print(f"Processed {len(chunks)} chunks with result length: {len(results)}")
print(f"Average result: {np.mean(results)}")
Joblib особенно удобен, когда вы работаете с numpy или scikit-learn, благодаря оптимизированной сериализации для научных данных.
2. Dask — масштабируемые параллельные вычисления с интерфейсом, похожим на NumPy и Pandas:
import dask.array as da
import numpy as np
import time
# Создаем большую матрицу через Dask
# 10,000 x 10,000 матрица занимает ~800MB в памяти
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Операции выполняются ленино (отложенно)
y = x.mean(axis=0)
z = y[::100] # Берем каждый сотый элемент
# Запускаем вычисления
start = time.time()
result = z.compute() # Это запускает параллельные вычисления
elapsed = time.time() – start
print(f"Dask computation completed in {elapsed:.2f} seconds")
print(f"Result shape: {result.shape}")
Dask особенно мощный для обработки данных, которые не помещаются в память, поскольку он умеет работать с чанками данных и оптимизирует граф вычислений.
3. Ray — фреймворк для распределенных вычислений с акцентом на машинное обучение:
import ray
import time
import numpy as np
# Инициализируем Ray
ray.init()
@ray.remote
def compute_intensive_function(i, size):
# Эмуляция вычислительно-интенсивной функции
time.sleep(1) # Имитация длительных вычислений
matrix = np.random.rand(size, size)
return np.linalg.det(matrix)
# Параллельный запуск задач
size = 1000
start_time = time.time()
futures = [compute_intensive_function.remote(i, size) for i in range(10)]
results = ray.get(futures)
end_time = time.time()
print(f"Ray computation completed in {end_time – start_time:.2f} seconds")
print(f"Results: {results[:3]}...") # Показываем первые три результата
Ray особенно хорош для распределенных приложений ML и RL (reinforcement learning), где требуется гибкое масштабирование ресурсов.
4. Vaex — для работы с большими табличными данными (до ~100GB) в формате, похожем на pandas:
import vaex
import numpy as np
import time
# Создаем большой датасет
N = 10_000_000 # 10 миллионов строк
df = vaex.from_arrays(
x=np.random.normal(0, 1, N),
y=np.random.uniform(0, 1, N),
z=np.random.exponential(1, N)
)
# Выполняем операции над всем датасетом
start_time = time.time()
mean_x = df.x.mean()
mean_y = df.y.mean()
mean_z = df.z.mean()
# Группировка и агрегация
result = df.groupby(df.x.bin(10)).agg({'y': 'mean', 'z': 'sum'})
end_time = time.time()
print(f"Vaex computation completed in {end_time – start_time:.2f} seconds")
print(f"Means: x={mean_x}, y={mean_y}, z={mean_z}")
print(f"Result shape: {result.shape}")
Выбор специализированной библиотеки зависит от конкретного сценария использования:
- Для научных вычислений и ML: joblib, scikit-learn's parallel_backend
- Для больших массивов и DataFrame: Dask, Vaex
- Для распределенных систем: Ray, PySpark
- Для GPU-вычислений: CuPy, PyTorch, TensorFlow
Преимущество специализированных библиотек в том, что они абстрагируют сложности параллельного программирования и предоставляют интерфейсы, оптимизированные для конкретных типов задач. Если ваша задача попадает в одну из этих категорий, использование специализированной библиотеки часто будет более эффективным, чем ручная реализация параллелизма с помощью базовых инструментов. 🚀
Python предоставляет богатый арсенал инструментов для параллельных вычислений, каждый со своими сильными сторонами. Для I/O-bound задач threading и asyncio предлагают отличную производительность, в то время как multiprocessing необходим для CPU-интенсивных вычислений. Высокоуровневые библиотеки вроде Dask или Ray становятся незаменимы при работе с большими данными. Правильный выбор инструмента зависит от характера вашей задачи и требуемого масштаба. Освоив эти 5 методов параллелизма, вы сможете эффективно задействовать всю мощь современных многоядерных систем в своих Python-приложениях.