Пул потоков в Python: как работать с ThreadPoolExecutor для параллельных задач

Пройдите тест, узнайте какой профессии подходите
Сколько вам лет
0%
До 18
От 18 до 24
От 25 до 34
От 35 до 44
От 45 до 49
От 50 до 54
Больше 55

Для кого эта статья:

  • Python-разработчики, желающие улучшить свои навыки в многопоточном программировании
  • Специалисты, занимающиеся оптимизацией производительности приложений
  • Студенты и новички, изучающие параллельное выполнение задач в Python

    Многопоточное программирование в Python часто вызывает панический холодок у разработчиков, особенно когда дело касается управления большим количеством потоков для параллельных задач. Если вы хоть раз пытались вручную создавать и координировать десятки потоков, вы знаете, что это может превратиться в настоящий кошмар отладки и управления ресурсами. Пул потоков — элегантное решение, которое превращает сложное в простое, а хаос в порядок. В этом руководстве мы разберем, как ThreadPoolExecutor из модуля concurrent.futures может стать вашим секретным оружием для эффективного параллельного выполнения задач. 🚀

Хотите стать мастером Python и уверенно разрабатывать высокопроизводительные приложения? Обучение Python-разработке от Skypro научит вас не только основам, но и продвинутым техникам многопоточности и асинхронного программирования. Наши студенты реализуют реальные проекты с использованием пула потоков и других технологий оптимизации, делая их код быстрым и эффективным. Превратите теорию в практический навык!

Основы пулов потоков в Python и их практическое значение

Пул потоков — это управляемая коллекция потоков выполнения, которые ожидают назначения задач. Вместо создания нового потока для каждой задачи, пул поддерживает группу доступных потоков и распределяет задачи между ними. Это значительно уменьшает накладные расходы, связанные с созданием и уничтожением потоков.

В Python пул потоков реализован через класс ThreadPoolExecutor из модуля concurrent.futures, который появился в версии 3.2. Этот высокоуровневый интерфейс значительно упрощает работу с многопоточностью по сравнению с базовым модулем threading.

Давайте рассмотрим ключевые преимущества использования пула потоков:

  • Эффективное управление ресурсами — пул автоматически контролирует количество активных потоков, предотвращая истощение системных ресурсов
  • Повторное использование потоков — потоки не создаются и не уничтожаются для каждой задачи, что экономит время и ресурсы
  • Простой интерфейс — удобные методы map(), submit() и as_completed() делают код чище и читабельнее
  • Обработка результатов — встроенные механизмы для получения результатов выполнения задач через объекты Future
  • Обработка исключений — структурированный подход к обработке ошибок в параллельном коде

Когда стоит использовать пул потоков? Это идеальное решение для задач, ограниченных вводом-выводом (I/O-bound), таких как:

  • Скрапинг веб-сайтов
  • Работа с сетевыми запросами
  • Операции с файлами
  • Запросы к базам данных

Для задач, ограниченных процессором (CPU-bound), пул потоков менее эффективен из-за GIL (Global Interpreter Lock) в Python. В таких случаях лучше использовать ProcessPoolExecutor или асинхронное программирование.

Александр Петров, Lead Python-разработчик

Несколько лет назад наша команда столкнулась с задачей обработки тысяч изображений из разных источников. Мы написали скрипт, который последовательно загружал каждое изображение, применял фильтры и сохранял результат. Обработка занимала около 8 часов.

Помню, как в пятницу вечером меня осенило: ведь это же идеальный сценарий для многопоточности! Большая часть времени тратилась на ввод-вывод — загрузку и сохранение файлов. За час я переписал код с использованием ThreadPoolExecutor, и время обработки сократилось до 40 минут. Коллеги были в шоке от такого прироста производительности, а я получил дополнительный выходной. Самое удивительное, что код стал даже короче и понятнее, чем был.

Подход Преимущества Недостатки Рекомендуемое использование
Стандартный threading Полный контроль над жизненным циклом потоков Сложный код, ручное управление Сложные потоковые взаимодействия
ThreadPoolExecutor Автоматизированное управление, простой интерфейс Меньше контроля над отдельными потоками Большинство I/O-bound задач
ProcessPoolExecutor Обход GIL, использование нескольких ядер Больше накладных расходов CPU-bound задачи
asyncio Эффективное использование одного потока Требует переписывания кода под async/await Высоконагруженные I/O-задачи
Пошаговый план для смены профессии

Создание пула потоков через ThreadPoolExecutor

Создание пула потоков в Python — процесс, требующий минимум кода и максимум эффективности. Модуль concurrent.futures предоставляет элегантный интерфейс, который позволяет начать работу с многопоточностью буквально в несколько строк.

Начнем с базового примера создания ThreadPoolExecutor:

Python
Скопировать код
import concurrent.futures
import time

def task(name):
print(f"Задача {name} начата")
time.sleep(2) # Имитация работы
print(f"Задача {name} завершена")
return f"Результат {name}"

# Создаем пул с 3 рабочими потоками
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Запускаем 5 задач
future_results = {executor.submit(task, i): i for i in range(5)}

# Обрабатываем результаты по мере завершения
for future in concurrent.futures.as_completed(future_results):
task_id = future_results[future]
try:
result = future.result()
print(f"Задача {task_id} вернула: {result}")
except Exception as e:
print(f"Задача {task_id} вызвала исключение: {e}")

В этом примере мы создаем пул с тремя потоками для выполнения пяти задач. Обратите внимание на использование контекстного менеджера (with), который автоматически вызывает метод shutdown() при выходе из блока, освобождая ресурсы.

Ключевые параметры при создании ThreadPoolExecutor:

  • max_workers — максимальное количество потоков в пуле (по умолчанию: min(32, (os.cpu_count() или 1) + 4))
  • threadnameprefix — префикс для имен создаваемых потоков (полезно для отладки)
  • initializer — функция, вызываемая при старте каждого потока (добавлена в Python 3.7)
  • initargs — аргументы для функции initializer

Как определить оптимальное количество потоков? Это зависит от характера ваших задач:

  • Для I/O-bound задач обычно эффективно иметь больше потоков, чем ядер (2-4 потока на ядро)
  • Для задач со смешанными операциями разумное значение — количество ядер процессора + несколько дополнительных потоков
  • Слишком большое количество потоков может привести к избыточным переключениям контекста и ухудшить производительность

Ещё один пример — использование метода map() для параллельного выполнения функции с разными аргументами:

Python
Скопировать код
import concurrent.futures
import requests

urls = [
'https://www.python.org/',
'https://docs.python.org/',
'https://pypi.org/',
'https://github.com/python/cpython',
'https://www.djangoproject.com/'
]

def load_url(url):
response = requests.get(url)
return f"{url}: {len(response.content)} bytes"

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(load_url, urls)

for url, result in zip(urls, results):
print(result)

Метод map() — это мощный инструмент, который упрощает параллельную обработку элементов коллекции. Он автоматически создает задачу для каждого элемента входного итерируемого объекта и возвращает результаты в том же порядке, что и входные данные. 🔄

Важно помнить, что контекстный менеджер автоматически вызывает executor.shutdown(wait=True), что заставляет программу ожидать завершения всех задач. Если вам нужно другое поведение, вы можете явно вызвать shutdown() с нужными параметрами:

Python
Скопировать код
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# ... выполнение задач ...
# Завершить пул без ожидания задач
executor.shutdown(wait=False)

Управление задачами и результатами в многопоточной среде

Эффективное управление задачами и их результатами — ключевой аспект работы с пулом потоков. ThreadPoolExecutor предоставляет элегантные инструменты, позволяющие контролировать выполнение задач и обрабатывать их результаты.

Давайте рассмотрим два основных подхода к запуску задач в пуле потоков:

Метод Описание Возвращаемое значение Порядок результатов
submit(fn, args, *kwargs) Отправляет одну функцию на выполнение Объект Future Произвольный (используйте as_completed)
map(fn, *iterables) Применяет функцию к каждому элементу последовательности Итератор результатов Соответствует порядку входных данных

Метод submit() возвращает объект Future, который представляет собой "обещание" результата в будущем. Это мощный абстрактный объект с методами:

  • result(timeout=None) — получает результат задачи (блокирует выполнение до готовности)
  • done() — проверяет, завершилась ли задача
  • cancel() — пытается отменить выполнение задачи
  • exception(timeout=None) — возвращает исключение, если оно возникло
  • adddonecallback(fn) — добавляет функцию, которая будет вызвана при завершении задачи

Рассмотрим пример использования callback-функций с Future:

Python
Скопировать код
import concurrent.futures
import time
import random

def task(id):
sleep_time = random.uniform(0.5, 3.0)
print(f"Задача {id} начата, будет выполняться {sleep_time:.2f} сек")
time.sleep(sleep_time)
if random.random() < 0.2: # 20% шанс ошибки
raise ValueError(f"Ошибка в задаче {id}")
return f"Результат задачи {id}"

def task_done_callback(future):
# Эта функция вызывается, когда задача завершается
try:
result = future.result()
print(f"Callback: задача завершена успешно с результатом: {result}")
except Exception as e:
print(f"Callback: задача завершилась с ошибкой: {e}")

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = []
for i in range(10):
future = executor.submit(task, i)
future.add_done_callback(task_done_callback)
futures.append(future)

# Дополнительно можем дождаться всех результатов
for i, future in enumerate(futures):
try:
result = future.result()
print(f"Основной поток: задача {i} вернула {result}")
except Exception as e:
print(f"Основной поток: задача {i} вызвала исключение {e}")

Для обработки результатов по мере их готовности используйте функцию as_completed(). Она возвращает итератор по объектам Future по мере их завершения:

Python
Скопировать код
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {executor.submit(load_url, url): url for url in urls}

for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url} обработан, получено {len(data)} байт")
except Exception as e:
print(f"{url} вызвал ошибку: {e}")

Обработка исключений — важный аспект многопоточного программирования. Если в задаче возникает исключение, оно не прерывает работу всей программы, а сохраняется в объекте Future и "всплывет" при вызове result(). Это позволяет элегантно обрабатывать ошибки без нарушения работы других потоков. 🛡️

Таймауты также важны для предотвращения блокировки при зависших задачах:

Python
Скопировать код
future = executor.submit(long_running_task)
try:
result = future.result(timeout=10) # Ждем результат максимум 10 секунд
except concurrent.futures.TimeoutError:
print("Задача выполняется слишком долго, отмена...")
future.cancel() # Пытаемся отменить задачу

Для более сложных сценариев можно комбинировать wait() с разными стратегиями ожидания:

Python
Скопировать код
done, not_done = concurrent.futures.wait(
futures, 
timeout=5,
return_when=concurrent.futures.FIRST_COMPLETED # Или ALL_COMPLETED, FIRST_EXCEPTION
)

print(f"Завершено {len(done)} задач, осталось {len(not_done)}")

Михаил Сорокин, Tech Lead

Недавно наш сервис мониторинга перестал справляться с растущим количеством проверяемых эндпоинтов. Система проверяла доступность веб-сервисов последовательно, и с ростом до 500+ проверок время полного цикла превысило интервал между проверками.

Я решил применить ThreadPoolExecutor для параллельной проверки сервисов. Изначально я просто заменил последовательный обход на параллельный через executor.map(). Производительность выросла в ~20 раз, но появилась новая проблема — некоторые "медленные" сервисы блокировали обработку остальных результатов.

Решение нашлось в использовании submit() с as_completed() и таймаутами. Мы добавили максимальное время ожидания для каждого запроса и начали обрабатывать результаты по мере их поступления:

Python
Скопировать код
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_endpoint = {
executor.submit(check_endpoint, endpoint, timeout=5): endpoint 
for endpoint in endpoints
}

for future in as_completed(future_to_endpoint, timeout=60):
endpoint = future_to_endpoint[future]
try:
result = future.result()
process_result(endpoint, result)
except Exception as e:
log_error(endpoint, e)

Система стала не только быстрой, но и устойчивой к проблемам отдельных сервисов. Это был тот редкий случай, когда оптимизация оказалась настолько успешной, что нам пришлось добавить искусственную задержку, чтобы не перегружать некоторые проверяемые сервисы.

Оптимизация параллельных задач с помощью пула потоков

Знать синтаксис ThreadPoolExecutor — это только полдела. Настоящее мастерство приходит с пониманием того, как оптимизировать параллельные задачи для максимальной производительности. Рассмотрим ключевые стратегии оптимизации, которые помогут вам выжать максимум из пула потоков.

Первый вопрос, который нужно задать: сколько потоков использовать? Существует несколько подходов к определению оптимального размера пула:

Python
Скопировать код
import os
import psutil
import math

# Подход 1: Базовый подход — CPU + 4 (как в стандартной библиотеке)
max_workers_1 = (os.cpu_count() or 1) + 4

# Подход 2: Для I/O-bound задач — количество ядер умножить на 2-3
max_workers_2 = (os.cpu_count() or 1) * 3

# Подход 3: Более сложный подход, учитывающий память
# Предполагаем, что каждый поток использует примерно 10MB памяти
memory_per_thread = 10 * 1024 * 1024 # 10MB в байтах
available_memory = psutil.virtual_memory().available
memory_based_max_workers = int(available_memory * 0.8 / memory_per_thread)

# Выбираем минимальное значение, чтобы не перегрузить систему
final_max_workers = min(max_workers_2, memory_based_max_workers)
print(f"Оптимальное количество потоков: {final_max_workers}")

Для задач, ограниченных вводом-выводом (I/O-bound), большее количество потоков обычно дает лучшую производительность, но есть предел, после которого накладные расходы на управление потоками превышают выгоду. 📊

Второй важный аспект — группировка задач. Вместо создания отдельного потока для каждой мелкой операции, объединяйте логически связанные операции в более крупные блоки:

Python
Скопировать код
# Неэффективный подход: отдельный поток для каждой строки файла
def process_file_inefficient(filename):
with concurrent.futures.ThreadPoolExecutor() as executor:
with open(filename) as file:
futures = [executor.submit(process_line, line) for line in file]

for future in concurrent.futures.as_completed(futures):
result = future.result()
# обработка результата

# Оптимизированный подход: разделение файла на блоки
def process_file_efficient(filename, num_workers=None):
if num_workers is None:
num_workers = (os.cpu_count() or 1) * 2

# Определяем размер файла и делим на блоки
file_size = os.path.getsize(filename)
chunk_size = math.ceil(file_size / num_workers)

# Функция обработки блока строк
def process_chunk(start, size):
results = []
with open(filename) as file:
file.seek(start)
bytes_read = 0
for line in file:
results.append(process_line(line))
bytes_read += len(line.encode('utf-8'))
if bytes_read >= size:
break
return results

# Запускаем обработку блоков параллельно
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = []
for i in range(0, file_size, chunk_size):
futures.append(executor.submit(process_chunk, i, chunk_size))

# Собираем результаты
all_results = []
for future in concurrent.futures.as_completed(futures):
all_results.extend(future.result())

return all_results

Третья стратегия — использование семафоров для ограничения одновременного доступа к ограниченным ресурсам, например, при работе с внешними API с ограничениями на количество запросов:

Python
Скопировать код
import concurrent.futures
import threading
import time
import requests

# Ограничиваем до 5 одновременных запросов
semaphore = threading.Semaphore(5)

def fetch_with_limit(url):
with semaphore: # Блокирует, если уже 5 активных запросов
print(f"Fetching {url}")
response = requests.get(url)
time.sleep(1) # Имитация обработки
return response.status_code

urls = [f"https://example.com/{i}" for i in range(20)]

with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
future_to_url = {executor.submit(fetch_with_limit, url): url for url in urls}

for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
status_code = future.result()
print(f"{url} – Status code: {status_code}")
except Exception as exc:
print(f"{url} generated an exception: {exc}")

Для улучшения производительности помните о следующих оптимизациях:

  • Уменьшайте GIL-конкуренцию — освобождайте GIL, когда это возможно (например, при работе с чтением/записью)
  • Минимизируйте разделяемые данные — разделяемые ресурсы требуют синхронизации, что снижает параллелизм
  • Используйте буферизацию — для операций ввода-вывода это может значительно улучшить производительность
  • Применяйте структурные оптимизации — правильная группировка и разделение задач может дать больший прирост, чем микрооптимизации

Измеряйте производительность! Вот простой пример, как можно сравнить разные подходы:

Python
Скопировать код
import time
import concurrent.futures

def benchmark(func, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} занял {end_time – start_time:.4f} секунд")
return result, end_time – start_time

# Последовательная версия
def process_sequentially(data):
results = []
for item in data:
results.append(process_item(item))
return results

# Параллельная версия с map
def process_with_map(data, max_workers=None):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_item, data))
return results

# Параллельная версия с submit и as_completed
def process_with_submit(data, max_workers=None):
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_item, item) for item in data]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results

# Сравнение производительности
data = list(range(100))
seq_result, seq_time = benchmark(process_sequentially, data)
map_result, map_time = benchmark(process_with_map, data)
submit_result, submit_time = benchmark(process_with_submit, data)

print(f"Ускорение с map: {seq_time/map_time:.2f}x")
print(f"Ускорение с submit: {seq_time/submit_time:.2f}x")

Продвинутые стратегии использования ThreadPoolExecutor

Освоив базовые принципы работы с ThreadPoolExecutor, пора познакомиться с продвинутыми стратегиями, которые помогут справиться со сложными сценариями многопоточного программирования. Эти подходы позволят вам создавать гибкие, отказоустойчивые и высокопроизводительные системы. 🚀

Начнем с динамического управления задачами. Представьте, что вам нужно обрабатывать поток входящих запросов, но вы не знаете заранее их количество:

Python
Скопировать код
import concurrent.futures
import queue
import threading
import time
import random

# Очередь задач
task_queue = queue.Queue()
# Флаг для остановки обработчиков
stop_processing = threading.Event()

def worker_task():
while not stop_processing.is_set():
try:
# Получаем задачу из очереди с таймаутом
task_id, task_func, args = task_queue.get(timeout=0.5)
print(f"Обработка задачи {task_id}")
result = task_func(*args)
print(f"Задача {task_id} завершена с результатом: {result}")
except queue.Empty:
# Если очередь пуста, продолжаем цикл
continue
except Exception as e:
print(f"Ошибка при обработке задачи: {e}")

def example_task(a, b):
time.sleep(random.uniform(0.1, 0.5)) # Имитация работы
return a + b

# Создаем и запускаем пул потоков
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Запускаем воркеры
workers = [executor.submit(worker_task) for _ in range(3)]

# Генерируем и добавляем задачи в очередь
for i in range(10):
task_queue.put((i, example_task, (i, i*2)))
time.sleep(random.uniform(0.1, 0.3)) # Случайные интервалы между задачами

# Даем время на обработку
time.sleep(5)

# Сигнализируем воркерам о завершении
stop_processing.set()

# Дожидаемся завершения всех воркеров
for worker in workers:
worker.result() # Этот вызов блокирует до завершения потока

Этот паттерн реализует динамическую обработку задач с помощью очереди и отдельных воркеров. Он особенно полезен, когда задачи поступают асинхронно из внешних источников.

Другая продвинутая техника — обработка зависимостей между задачами. Иногда результат одной задачи является входными данными для других:

Python
Скопировать код
import concurrent.futures
import time

def stage1(data):
print(f"Обработка данных {data} на этапе 1")
time.sleep(1) # Имитация сложной обработки
return data * 2

def stage2(data):
print(f"Обработка данных {data} на этапе 2")
time.sleep(0.5)
return data + 10

def stage3(data):
print(f"Обработка данных {data} на этапе 3")
time.sleep(0.7)
return data ** 2

def process_pipeline(input_data):
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# Запускаем первый этап для всех входных данных
stage1_futures = {
executor.submit(stage1, data): data for data in input_data
}

# Запускаем второй этап по мере завершения первого
stage2_futures = {}
for future in concurrent.futures.as_completed(stage1_futures):
data = stage1_futures[future]
try:
result = future.result()
# Запускаем второй этап с результатом первого
stage2_future = executor.submit(stage2, result)
stage2_futures[stage2_future] = result
except Exception as e:
print(f"Ошибка на этапе 1 для данных {data}: {e}")

# Запускаем третий этап по мере завершения второго
stage3_futures = {}
for future in concurrent.futures.as_completed(stage2_futures):
data = stage2_futures[future]
try:
result = future.result()
# Запускаем третий этап
stage3_future = executor.submit(stage3, result)
stage3_futures[stage3_future] = result
except Exception as e:
print(f"Ошибка на этапе 2 для данных {data}: {e}")

# Собираем финальные результаты
final_results = {}
for future in concurrent.futures.as_completed(stage3_futures):
data = stage3_futures[future]
try:
result = future.result()
final_results[data] = result
except Exception as e:
print(f"Ошибка на этапе 3 для данных {data}: {e}")

return final_results

# Обрабатываем набор данных через конвейер
input_data = [1, 2, 3, 4, 5]
results = process_pipeline(input_data)
for data, result in results.items():
print(f"Входные данные: {data} -> Финальный результат: {result}")

Эта реализация конвейера позволяет обрабатывать данные через последовательные этапы, при этом разные элементы данных могут находиться на разных этапах обработки одновременно.

Для обеспечения устойчивости к ошибкам, можно реализовать механизм повторных попыток с экспоненциальной задержкой:

Python
Скопировать код
import concurrent.futures
import time
import random

def unreliable_task(task_id):
# Имитация ненадежной задачи, иногда вызывающей ошибки
if random.random() < 0.7: # 70% шанс ошибки
print(f"Задача {task_id}: ошибка!")
raise ValueError(f"Случайная ошибка в задаче {task_id}")
time.sleep(random.uniform(0.1, 0.3))
print(f"Задача {task_id}: успех!")
return f"Результат задачи {task_id}"

def execute_with_retry(task_func, task_id, max_retries=3, backoff_factor=2):
retries = 0
while retries <= max_retries:
try:
return task_func(task_id)
except Exception as e:
wait_time = backoff_factor ** retries
retries += 1
if retries > max_retries:
print(f"Задача {task_id} не удалась после {max_retries} попыток")
raise
print(f"Задача {task_id}: повтор через {wait_time} сек...")
time.sleep(wait_time)

# Используем с ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(execute_with_retry, unreliable_task, i) 
for i in range(5)
]

for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Получен результат: {result}")
except Exception as e:
print(f"Окончательная ошибка: {e}")

Иногда требуется ограничить использование ресурсов при многопоточной обработке. Здесь может помочь комбинация ThreadPoolExecutor с паттерном "Leaky Bucket":

Python
Скопировать код
import concurrent.futures
import threading
import time
import random

class LeakyBucket:
"""Реализация алгоритма Leaky Bucket для ограничения скорости"""

def __init__(self, capacity, leak_rate):
self.capacity = capacity # Максимальное число токенов
self.leak_rate = leak_rate # Токенов в секунду
self.tokens = capacity # Текущее число токенов
self.lock = threading.Lock() # Для потокобезопасности
self.last_leak = time.time() # Время последней утечки

def consume(self, tokens=1):
"""Потребляет токены, возвращает True если успешно, иначе False"""
with self.lock:
# Обновляем число токенов
now = time.time()
elapsed = now – self.last_leak
leaked = elapsed * self.leak_rate
self.tokens = min(self.capacity, self.tokens + leaked)
self.last_leak = now

# Пытаемся потребить токены
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False

def try_consume(self, tokens=1, max_wait=None):
"""Пытается потребить токены, ожидая при необходимости"""
start_time = time.time()
while True:
if self.consume(tokens):
return True

# Проверяем, не превышено ли максимальное время ожидания
if max_wait is not None and time.time() – start_time > max_wait:
return False

# Спим небольшое время перед повторной попыткой
time.sleep(0.1)

# Создаем ограничитель скорости: 5 запросов в секунду
rate_limiter = LeakyBucket(capacity=5, leak_rate=5)

def rate_limited_task(task_id):
# Пытаемся получить разрешение на выполнение
if not rate_limiter.try_consume(1, max_wait=10):
raise TimeoutError(f"Задача {task_id}: превышено время ожидания в очереди")

print(f"Задача {task_id} начата в {time.strftime('%H:%M:%S')}")
time.sleep(random.uniform(0.1, 0.5)) # Имитация работы
return f"Результат задачи {task_id}"

# Запускаем множество задач, но с ограничением скорости
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(rate_limited_task, i) 
for i in range(20)
]

for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(result)
except Exception as e:
print(f"Ошибка: {e}")

В некоторых сценариях может потребоваться динамически изменять размер пула потоков в зависимости от нагрузки. Для этого можно использовать собственную реализацию, которая расширяет стандартный ThreadPoolExecutor:

Python
Скопировать код
import concurrent.futures
import threading
import time
import os
import psutil

class DynamicThreadPoolExecutor:
"""Исполнитель пула потоков с динамическим изменением размера"""

def __init__(self, min_workers=1, max_workers=None, 
adjust_interval=60, load_threshold=0.7):
self.min_workers = min_workers
self.max_workers = max_workers or (os.cpu_count() or 1) * 5
self.adjust_interval = adjust_interval # Интервал корректировки в секундах
self.load_threshold = load_threshold # Порог загрузки для увеличения

self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.min_workers
)
self.current_workers = self.min_workers
self.task_count = 0
self.task_lock = threading.Lock()

self.monitor_thread = threading.Thread(
target=self._monitor, daemon=True
)
self.running = True
self.monitor_thread.start()

def _monitor(self):
"""Мониторит нагрузку и корректирует размер пула"""
while self.running:
time.sleep(self.adjust_interval)

# Определяем текущую загрузку пула
with self.task_lock:
if self.task_count > 0:
load = self.task_count / self.current_workers
else:
load = 0

if load > self.load_threshold and self.current_workers < self.max_workers:
# Увеличиваем пул, если загрузка высока
new_size = min(
self.current_workers * 2, 
self.max_workers
)
print(f"Увеличиваем пул потоков: {self.current_workers} -> {new_size}")

# Создаем новый executor с увеличенным размером
new_executor = concurrent.futures.ThreadPoolExecutor(max_workers=new_size)
old_executor = self.executor
self.executor = new_executor
self.current_workers = new_size

# Завершаем старый executor, но позволяем завершить текущие задачи
old_executor.shutdown(wait=False)

elif load < self.load_threshold * 0.3 and self.current_workers > self.min_workers:
# Уменьшаем пул, если загрузка низкая
new_size = max(
self.current_workers // 2, 
self.min_workers
)
print(f"Уменьшаем пул потоков: {self.current_workers} -> {new_size}")

new_executor = concurrent.futures.ThreadPoolExecutor(max_workers=new_size)
old_executor = self.executor
self.executor = new_executor
self.current_workers = new_size

old_executor.shutdown(wait=False)

def submit(self, fn, *args, **kwargs):
"""Отправляет задачу на выполнение"""
with self.task_lock:
self.task_count += 1

future = self.executor.submit(fn, *args, **kwargs)
future.add_done_callback(self._task_done)
return future

def _task_done(self, future):
"""Вызывается при завершении задачи"""
with self.task_lock:
self.task_count -= 1

def shutdown(self, wait=True):
"""Завершает работу пула потоков"""
self.running = False
self.monitor_thread.join(timeout=1)
self.executor.shutdown(wait=wait)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

# Пример использования
def example_task(task_id):
print(f"Задача {task_id} выполняется")
time.sleep(random.uniform(1, 3))
return f"Результат {task_id}"

with DynamicThreadPoolExecutor(min_workers=2, max_workers=10) as executor:
# Отправляем волны задач
for wave in range(3):
print(f"\nЗапуск волны {wave} задач")
futures = []

# Создаем много задач в каждой волне
task_count = 5 if wave == 0 else (15 if wave == 1 else 3)
for i in range(task_count):
futures.append(executor.submit(example_task, f"{wave}-{i}"))

# Ждем завершения всех задач в волне
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Получен результат: {result}")

# Пауза между волнами
time.sleep(5)

Пул потоков — это мощный инструмент для параллельного программирования в Python, который значительно упрощает реализацию многопоточных задач. ThreadPoolExecutor предоставляет элегантный интерфейс, который избавляет от необходимости вручную управлять жизненным циклом потоков. Правильное использование пула потоков позволяет создавать высокопроизводительные приложения, эффективно использующие доступные ресурсы системы. Освоив описанные стратегии и приемы, вы сможете писать устойчивый и эффективный многопоточный код, который останется понятным и поддерживаемым — редкое сочетание для параллельного программирования.

Загрузка...