5 методов параллелизма в Python: ускорение кода на многоядерных CPU

Пройдите тест, узнайте какой профессии подходите
Сколько вам лет
0%
До 18
От 18 до 24
От 25 до 34
От 35 до 44
От 45 до 49
От 50 до 54
Больше 55

Для кого эта статья:

  • Разработчики на Python, заинтересованные в оптимизации производительности своих скриптов
  • Специалисты по анализу данных и машинному обучению, работающие с большими объемами данных
  • Студенты и начинающие программисты, стремящиеся повысить свои навыки в параллельном программировании

    Когда ваш Python-скрипт обрабатывает данные со скоростью улитки, а дедлайн горит — время подумать о параллельных вычислениях. Я собрал пять проверенных методов, которые превратят ваш одноядерный код в многопоточную машину производительности. От классического threading до современного asyncio — эти инструменты способны ускорить ваши вычисления в разы, если применить их правильно. Готовы наконец раскрыть весь потенциал ваших многоядерных процессоров? 🚀

Если вы хотите не просто скопировать код из статьи, а по-настоящему освоить параллельное программирование на Python, стоит задуматься о системном обучении. Обучение Python-разработке от Skypro включает продвинутые техники оптимизации кода и параллельных вычислений. Через полгода вы сможете превращать любой последовательный скрипт в эффективное многопоточное приложение, оптимизируя работу с данными любого масштаба.

Почему Python нуждается в параллельных вычислениях

Python прекрасен своей простотой и элегантностью, но у этой медали есть обратная сторона — печально известный GIL (Global Interpreter Lock). Эта архитектурная особенность, словно строгий охранник, не позволяет нескольким потокам Python одновременно выполнять байт-код. В результате многопоточное приложение на Python часто работает не быстрее однопоточного — парадокс, не правда ли? 😕

При этом современные процессоры давно перешагнули эпоху одноядерности. Даже бюджетные смартфоны сегодня оснащаются 4-8 ядрами, а серверы могут похвастаться десятками физических ядер. Не использовать эту мощь — значит добровольно отказываться от огромного вычислительного потенциала.

Алексей Иванов, ведущий инженер по высокопроизводительным вычислениям

В 2021 году мы столкнулись с неожиданной проблемой — наш сервис анализа геопространственных данных начал задыхаться под нагрузкой. Каждый день обрабатывалось около 2 ТБ спутниковых снимков, и последовательный код просто не справлялся. Клиенты жаловались на задержки до 12 часов. Внедрение параллелизма через multiprocessing и joblib сократило время обработки до 40 минут. Но настоящий прорыв случился, когда мы перешли на Dask для распределенных вычислений — время обработки упало до 7 минут. Эта оптимизация не только сохранила нам клиентов, но и позволила масштабировать бизнес на новые регионы без увеличения инфраструктурных затрат.

Существует несколько сценариев, когда параллельные вычисления на Python становятся не роскошью, а необходимостью:

  • CPU-интенсивные задачи: обработка изображений, машинное обучение, симуляции
  • I/O-интенсивные операции: работа с сетью, файлами, базами данных
  • Обработка больших объемов данных: когда данные не помещаются в память одного процесса
  • Real-time системы: когда требуется одновременно обрабатывать множество событий

Давайте рассмотрим, какие инструменты предлагает Python для параллельного программирования и как их эффективно применять. 🧰

Тип задач Рекомендуемый подход Примеры задач
CPU-bound multiprocessing, joblib Машинное обучение, обработка изображений, сложные вычисления
I/O-bound threading, asyncio Сетевые запросы, работа с файлами, API-вызовы
Смешанные concurrent.futures + комбинированные подходы Веб-скрейпинг с обработкой данных, ETL-процессы
Распределенные Dask, Ray Big Data аналитика, распределенное машинное обучение
Пошаговый план для смены профессии

Многопоточность через threading: возможности и ограничения

Модуль threading — самый старый и наиболее знакомый инструмент для параллелизма в Python. Он позволяет создавать потоки, которые выполняются параллельно в рамках одного процесса. Но вспомним о GIL: он превращает многопоточность в Python в нечто похожее на псевдопараллелизм. 🧵

Однако не спешите списывать threading со счетов. Для I/O-bound задач, где большую часть времени программа ожидает завершения внешних операций (чтение файлов, сетевые запросы, запросы к базам данных), threading может дать впечатляющий прирост производительности.

Вот простой пример использования threading для параллельной загрузки веб-страниц:

Python
Скопировать код
import threading
import requests
import time

urls = [
"https://www.python.org/",
"https://docs.python.org/",
"https://pypi.org/",
"https://github.com/python/cpython",
"https://wiki.python.org/moin/"
] * 5 # Умножаем для большего количества запросов

def download(url):
response = requests.get(url)
print(f"Downloaded {url}, status code: {response.status_code}, length: {len(response.text)}")

# Последовательное выполнение
start_time = time.time()
for url in urls:
download(url)
print(f"Sequential execution took: {time.time() – start_time:.2f} seconds\n")

# Параллельное выполнение через threading
start_time = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=download, args=(url,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()
print(f"Threaded execution took: {time.time() – start_time:.2f} seconds")

При выполнении этого кода на типичной задаче с 25 URL, многопоточная версия может быть в 10-15 раз быстрее последовательной. Однако у threading есть ряд ограничений, которые следует учитывать:

  • GIL блокирует истинный параллелизм для CPU-bound задач
  • Работа с общими данными требует синхронизации (locks, semaphores)
  • Отладка многопоточных программ может быть сложной из-за недетерминированного выполнения
  • Слишком большое количество потоков создает накладные расходы на переключение контекста

Threading лучше всего использовать в следующих сценариях:

  • Сетевые операции и API-запросы
  • Файловые операции, особенно с медленными накопителями
  • Задачи, где присутствует длительное ожидание (waiters, timers)
  • Пользовательские интерфейсы, где требуется отзывчивость

Важно помнить: потоки разделяют память процесса, что облегчает обмен данными, но создает риск гонок данных (data races). Использование блокировок и примитивов синхронизации критически важно для безопасной работы с общими ресурсами. 🔒

Многопроцессность с multiprocessing: масштабируем вычисления

Когда ограничения GIL становятся непреодолимым барьером, на сцену выходит модуль multiprocessing. В отличие от threading, он создает отдельные процессы Python, каждый со своим интерпретатором и, следовательно, со своим GIL. Это позволяет по-настоящему задействовать все ядра процессора для CPU-интенсивных задач. 🔄

Синтаксически multiprocessing напоминает threading, что облегчает переход между этими библиотеками:

Python
Скопировать код
import multiprocessing
import time

def cpu_intensive_task(number):
"""Функция, интенсивно использующая CPU"""
result = 0
for i in range(10**7): # 10 миллионов операций
result += i * number
return result

# Последовательное выполнение
start_time = time.time()
results = [cpu_intensive_task(i) for i in range(8)]
print(f"Sequential execution took: {time.time() – start_time:.2f} seconds")

# Параллельное выполнение через multiprocessing
start_time = time.time()
with multiprocessing.Pool(processes=8) as pool:
results = pool.map(cpu_intensive_task, range(8))
print(f"Multiprocessing execution took: {time.time() – start_time:.2f} seconds")

На многоядерном процессоре этот код может продемонстрировать ускорение, близкое к линейному относительно числа ядер. Например, на 8-ядерном CPU можно получить 6-7-кратное ускорение по сравнению с последовательным выполнением. 🚀

Марина Соколова, технический лид в области анализа данных

В нашем проекте по генетическом анализу мы обрабатывали геномные последовательности размером до 500 ГБ. Изначально я использовала threading для параллелизации, не понимая, почему скорость обработки практически не улучшалась. После профилирования стало очевидно, что 95% времени уходит на вычисления, а не на I/O. Замена threading на multiprocessing с правильной стратегией разделения данных дала ускорение в 22 раза на нашем 24-ядерном сервере. Интересно, что нам пришлось решать нетривиальную проблему: при обмене большими массивами данных между процессами стандартный pickle достигал своих лимитов. Переход на dill и оптимизация передачи данных через разделяемую память (shared memory) позволили избежать узких мест. Сейчас анализ, который раньше занимал 14 часов, выполняется за 35-40 минут.

Однако multiprocessing имеет свои особенности и ограничения:

  1. Накладные расходы на создание процессов — процессы требуют больше ресурсов, чем потоки
  2. Обмен данными между процессами происходит через сериализацию/десериализацию (pickle)
  3. Разделяемые ресурсы требуют специальных механизмов (Value, Array, Manager)
  4. Не все объекты можно сериализовать для передачи между процессами

Для эффективного использования multiprocessing рекомендую следующие практики:

  • Использовать пулы процессов (Pool) для управления рабочими процессами
  • Минимизировать передачу данных между процессами
  • Для больших объемов данных применять shared memory или memmap
  • Учитывать overhead на создание процессов — оно должно окупаться временем вычислений

Модуль multiprocessing предоставляет несколько способов организации параллельных вычислений:

Метод Описание Применимость
Pool.map() Параллельная обработка итерируемого объекта Идеально для map/reduce операций
Pool.apply_async() Асинхронный запуск функций Когда нужно запускать задачи динамически
Process Ручное управление процессами Сложная логика с кастомной координацией
Queue/Pipe Обмен сообщениями между процессами Производитель-потребитель паттерны
Value/Array Разделяемые переменные Атомарные операции с общими данными

Высокоуровневые абстракции concurrent.futures

Если вам нужен более чистый и современный API для работы с параллелизмом, обратите внимание на модуль concurrent.futures. Он предоставляет высокоуровневый интерфейс как для многопоточности (ThreadPoolExecutor), так и для многопроцессорности (ProcessPoolExecutor). 👨‍💻

Главное преимущество concurrent.futures — унифицированный интерфейс, который абстрагирует детали реализации параллелизма и позволяет легко переключаться между потоками и процессами:

Python
Скопировать код
import concurrent.futures
import time
import requests

urls = [
"https://www.python.org/",
"https://docs.python.org/",
"https://pypi.org/",
"https://github.com/python/cpython",
"https://wiki.python.org/moin/"
] * 5

def download(url):
response = requests.get(url)
return f"Downloaded {url}, status: {response.status_code}, length: {len(response.text)}"

# С использованием ThreadPoolExecutor (для I/O-bound задач)
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(download, urls)
for result in results:
print(result)
print(f"ThreadPoolExecutor took: {time.time() – start_time:.2f} seconds\n")

# Функция для CPU-bound задачи
def calculate(number):
return sum(i * i for i in range(number))

numbers = [20000000, 25000000, 30000000, 35000000, 40000000]

# С использованием ProcessPoolExecutor (для CPU-bound задач)
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(calculate, numbers)
for result in list(results):
print(f"Sum calculated: {result}")
print(f"ProcessPoolExecutor took: {time.time() – start_time:.2f} seconds")

Преимущества concurrent.futures:

  • Унифицированный API для потоков и процессов
  • Встроенный пул исполнителей с автоматическим управлением ресурсами
  • Возможность получать результаты через Future объекты
  • Удобные методы map() и submit() для разных паттернов параллелизма
  • Контекстные менеджеры для безопасного освобождения ресурсов

Особенно полезной является возможность использовать метод as_completed(), который позволяет обрабатывать результаты задач по мере их завершения, а не ждать завершения всех задач:

Python
Скопировать код
import concurrent.futures
import time
import random

def task(n):
"""Эмуляция задачи с разным временем выполнения"""
sleep_time = random.uniform(0.5, 3)
time.sleep(sleep_time) # эмуляция работы
return f"Task {n} completed after {sleep_time:.2f} seconds"

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Запускаем задачи асинхронно
future_to_task = {executor.submit(task, i): i for i in range(10)}

# Обрабатываем результаты по мере завершения задач
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result()
print(f"Got result from task {task_id}: {result}")
except Exception as e:
print(f"Task {task_id} generated an exception: {e}")

Выбор между ThreadPoolExecutor и ProcessPoolExecutor должен основываться на характере вашей задачи:

  • ThreadPoolExecutor: для I/O-bound задач (сеть, файлы, БД)
  • ProcessPoolExecutor: для CPU-bound задач (вычисления, обработка данных)

Concurrent.futures — отличный выбор, когда вам нужно быстро реализовать параллельное выполнение без глубокого погружения в детали синхронизации и управления ресурсами. 🎯

Асинхронное программирование: asyncio для неблокирующих задач

С появлением asyncio в Python 3.4 разработчики получили мощный инструмент для конкурентного выполнения задач в рамках одного потока. Это не параллелизм в классическом понимании, а скорее кооперативная многозадачность, где задачи явно сигнализируют о готовности уступить контроль. 🔄

Ключевая идея asyncio — неблокирующие операции. Вместо того чтобы блокировать выполнение при ожидании I/O, программа может переключиться к выполнению других задач:

Python
Скопировать код
import asyncio
import aiohttp
import time

async def fetch(session, url):
"""Асинхронная загрузка URL"""
async with session.get(url) as response:
return await response.text()

async def fetch_all(urls):
"""Загружает все URL параллельно"""
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results

async def main():
urls = [
"https://www.python.org/",
"https://docs.python.org/",
"https://pypi.org/",
"https://github.com/python/cpython",
"https://wiki.python.org/moin/"
] * 5 # 25 URLs

start_time = time.time()
results = await fetch_all(urls)
elapsed = time.time() – start_time

print(f"Downloaded {len(results)} pages in {elapsed:.2f} seconds")
for url, html in zip(urls, results):
print(f"URL: {url}, Size: {len(html)} bytes")

if __name__ == "__main__":
asyncio.run(main())

Этот код может выполнять десятки или сотни сетевых запросов конкурентно, используя всего один поток. На задаче с загрузкой 25 URL результат может быть впечатляющим — завершение за время, близкое к длительности самого медленного запроса, а не к сумме всех запросов.

Asyncio особенно хорошо подходит для:

  • Сетевых серверов с высокой конкурентностью
  • Микросервисной архитектуры с множеством межсервисных коммуникаций
  • Веб-скрейпинга с большим количеством запросов
  • GUI-приложений, требующих отзывчивости при I/O операциях

Важно понимать, что asyncio не ускоряет CPU-bound задачи. Более того, неправильное использование может даже замедлить выполнение программы из-за overhead на управление корутинами.

Ключевые концепции asyncio:

  • Корутины (coroutines): функции, объявленные с async def
  • Задачи (tasks): обертки вокруг корутин, управляемые циклом событий
  • Цикл событий (event loop): сердце asyncio, диспетчеризующее задачи
  • await: приостанавливает выполнение корутины до завершения другой корутины
  • gather/wait: функции для организации конкурентного выполнения

Важно практическое замечание: для эффективной работы с asyncio вы должны использовать асинхронные библиотеки, такие как aiohttp вместо requests, aiomysql вместо mysql, aiofiles вместо стандартных файловых операций и т.д.

Сравнение asyncio с другими подходами:

Аспект Threading Multiprocessing Asyncio
Модель конкуренции Предупредительная Предупредительная Кооперативная
Ресурсная эффективность Средняя Низкая Высокая
Для I/O-bound задач Хорошо Избыточно Отлично
Для CPU-bound задач Плохо (GIL) Отлично Плохо
Сложность синхронизации Высокая Высокая Низкая

Специализированные библиотеки для параллельной обработки данных

Помимо стандартных модулей Python, существует ряд специализированных библиотек, которые могут значительно упростить параллельную обработку данных в конкретных сценариях. Эти библиотеки предлагают высокоуровневые абстракции, оптимизированные для определенных типов задач. 📊

Рассмотрим наиболее популярные из них:

1. Joblib — оптимизирован для научных вычислений и задач машинного обучения:

Python
Скопировать код
from joblib import Parallel, delayed
import numpy as np

def process_chunk(chunk):
# Эмуляция сложной обработки
return np.mean(np.exp(chunk) * np.sin(chunk))

# Создаем большой массив данных
data = np.random.rand(1000000)

# Разделяем на чанки
chunks = np.array_split(data, 100)

# Параллельная обработка с joblib
results = Parallel(n_jobs=-1)(delayed(process_chunk)(chunk) for chunk in chunks)

print(f"Processed {len(chunks)} chunks with result length: {len(results)}")
print(f"Average result: {np.mean(results)}")

Joblib особенно удобен, когда вы работаете с numpy или scikit-learn, благодаря оптимизированной сериализации для научных данных.

2. Dask — масштабируемые параллельные вычисления с интерфейсом, похожим на NumPy и Pandas:

Python
Скопировать код
import dask.array as da
import numpy as np
import time

# Создаем большую матрицу через Dask
# 10,000 x 10,000 матрица занимает ~800MB в памяти
x = da.random.random((10000, 10000), chunks=(1000, 1000))

# Операции выполняются ленино (отложенно)
y = x.mean(axis=0)
z = y[::100] # Берем каждый сотый элемент

# Запускаем вычисления
start = time.time()
result = z.compute() # Это запускает параллельные вычисления
elapsed = time.time() – start

print(f"Dask computation completed in {elapsed:.2f} seconds")
print(f"Result shape: {result.shape}")

Dask особенно мощный для обработки данных, которые не помещаются в память, поскольку он умеет работать с чанками данных и оптимизирует граф вычислений.

3. Ray — фреймворк для распределенных вычислений с акцентом на машинное обучение:

Python
Скопировать код
import ray
import time
import numpy as np

# Инициализируем Ray
ray.init()

@ray.remote
def compute_intensive_function(i, size):
# Эмуляция вычислительно-интенсивной функции
time.sleep(1) # Имитация длительных вычислений
matrix = np.random.rand(size, size)
return np.linalg.det(matrix)

# Параллельный запуск задач
size = 1000
start_time = time.time()
futures = [compute_intensive_function.remote(i, size) for i in range(10)]
results = ray.get(futures)
end_time = time.time()

print(f"Ray computation completed in {end_time – start_time:.2f} seconds")
print(f"Results: {results[:3]}...") # Показываем первые три результата

Ray особенно хорош для распределенных приложений ML и RL (reinforcement learning), где требуется гибкое масштабирование ресурсов.

4. Vaex — для работы с большими табличными данными (до ~100GB) в формате, похожем на pandas:

Python
Скопировать код
import vaex
import numpy as np
import time

# Создаем большой датасет
N = 10_000_000 # 10 миллионов строк
df = vaex.from_arrays(
x=np.random.normal(0, 1, N),
y=np.random.uniform(0, 1, N),
z=np.random.exponential(1, N)
)

# Выполняем операции над всем датасетом
start_time = time.time()
mean_x = df.x.mean()
mean_y = df.y.mean()
mean_z = df.z.mean()

# Группировка и агрегация
result = df.groupby(df.x.bin(10)).agg({'y': 'mean', 'z': 'sum'})
end_time = time.time()

print(f"Vaex computation completed in {end_time – start_time:.2f} seconds")
print(f"Means: x={mean_x}, y={mean_y}, z={mean_z}")
print(f"Result shape: {result.shape}")

Выбор специализированной библиотеки зависит от конкретного сценария использования:

  • Для научных вычислений и ML: joblib, scikit-learn's parallel_backend
  • Для больших массивов и DataFrame: Dask, Vaex
  • Для распределенных систем: Ray, PySpark
  • Для GPU-вычислений: CuPy, PyTorch, TensorFlow

Преимущество специализированных библиотек в том, что они абстрагируют сложности параллельного программирования и предоставляют интерфейсы, оптимизированные для конкретных типов задач. Если ваша задача попадает в одну из этих категорий, использование специализированной библиотеки часто будет более эффективным, чем ручная реализация параллелизма с помощью базовых инструментов. 🚀

Python предоставляет богатый арсенал инструментов для параллельных вычислений, каждый со своими сильными сторонами. Для I/O-bound задач threading и asyncio предлагают отличную производительность, в то время как multiprocessing необходим для CPU-интенсивных вычислений. Высокоуровневые библиотеки вроде Dask или Ray становятся незаменимы при работе с большими данными. Правильный выбор инструмента зависит от характера вашей задачи и требуемого масштаба. Освоив эти 5 методов параллелизма, вы сможете эффективно задействовать всю мощь современных многоядерных систем в своих Python-приложениях.

Загрузка...