Python: использование threading pool вместо multiprocessing Pool

Пройдите тест, узнайте какой профессии подходите

Я предпочитаю
0%
Работать самостоятельно и не зависеть от других
Работать в команде и рассчитывать на помощь коллег
Организовывать и контролировать процесс работы

Быстрый ответ

concurrent.futures.ThreadPoolExecutor — это аналог multiprocessing.Pool, но реализованный на основе потоков в Python. Он позволяет выполнять задачи в параллельном режиме.

Вот пример его использования:

Python
Скопировать код
from concurrent.futures import ThreadPoolExecutor

def task_to_run(arg):
    # Обработка полученного аргумента
    return arg

with ThreadPoolExecutor(max_workers=5) as executor:
    # Распределение задач между потоками
    results = executor.map(task_to_run, range(10))
    for result in results:
        print(result)  # Выводим результаты на экран

Данный код создает пул из пяти потоков и выполняет функцию task_to_run для каждого элемента последовательности от 0 до 9. Результаты обработки аналогичны работе multiprocessing.Pool.map, но производятся на потоках.

Кинга Идем в IT: пошаговый план для смены профессии

Альтернативный пул потоков: multiprocessing.pool.ThreadPool

Если вам нужен ThreadPool с интерфейсом, близким к multiprocessing.Pool, обратите внимание на multiprocessing.pool.ThreadPool.

Python
Скопировать код
from multiprocessing.pool import ThreadPool

pool = ThreadPool(processes=4)
result = pool.map(task_to_run, range(10))
pool.close()
pool.join()

Освобождение от Глобальной Блокировки Интерпретатора (GIL) может улучшить производительность многопоточной обработки, особенно при операциях ввода-вывода.

Реализация собственного пула рабочих потоков

Вы можете создать собственный пул потоков, используя паттерн рабочего с очередью:

Python
Скопировать код
from threading import Thread, current_thread
from queue import Queue

class Worker(Thread):
    def __init__(self, task_queue):
        super().__init__()
        self.task_queue = task_queue
        self.daemon = True
  
    def run(self):
        while True:
            func, args, kwargs = self.task_queue.get()
            try:
                func(*args, **kwargs)
            except Exception as e:
                print(f"Ошибка в потоке {current_thread()}: {e}")
            finally:
                self.task_queue.task_done()

Класс Worker принимает задачи из очереди и обрабатывает их, выполняя соответствующую функцию асинхронно с заданными аргументами.

Визуализация

Пул потоков можно сравнить с работой бариста (🧑‍🍳) в кофейне (☕️):

Markdown
Скопировать код
Пул потоков: [🧑‍🍳, 🧑‍🍳, 🧑‍🍳]
- Каждый поток выполняет задачи последовательно, пользуясь общими ресурсами.

Подобно баристам, потоки обрабатывают несколько заданий одновременно:

Markdown
Скопировать код
Задания: 📝📝📝
Ресурсы: ☕️🍵🧁

Это иллюстрирует различие между многопоточностью и многопроцессорностью:

Markdown
Скопировать код
Многопоточность:    Одновременная обработка, Общие ресурсы
Многопроцессорность: Параллельная обработка, Независимые процессы

Советы и уловки для оптимизации производительности

Выбор между многопоточностью и многопроцессорностью зависит от задачи, так же как Тарзан выбирает, за какую лиану перекинуться.

Лучше всего выполнять операции ввода-вывода и задачи с высокой задержкой в многопоточном режиме, в то время как для вычислительно сложных задач из-за ограничений GIL в Python потребуется многопроцессорность.

Точная настройка количества потоков – это ключ к оптимальной производительности. Также важно учитывать безопасность потоков и предотвратить проблемы, связанные с совместным доступом к данным, с помощью механизмов блокировки, например, threading.Lock.

Полезные материалы

  1. Официальная документация по ThreadPoolExecutor.
  2. Практическое руководство по использованию concurrent.futures.
  3. Официальная документация по многопоточности в Python.
  4. Ключевой доклад на PyCon о расширенном параллелизме.
  5. Сравнение concurrent.futures и multiprocessing.