Асинхронное программирование в Python: корутины и их применение
Для кого эта статья:
- Python-разработчики, желающие улучшить производительность своих приложений
- Студенты и специалисты, интересующиеся асинхронным программированием
Программисты, работающие с высоконагруженными системами и I/O-операциями
Корутины в Python — это не просто модный термин для разработчиков, а настоящий прорыв в эффективности программ. Представьте, что ваш код может одновременно обрабатывать сотни запросов, не утопая в бездне многопоточности и не замерзая в ожидании сетевых ответов. Если ваши приложения "задыхаются" при масштабировании или I/O-операции превращаются в узкие места, корутины могут стать вашим секретным оружием. Готовы узнать, как превратить линейный Python-код в асинхронный шедевр производительности? 🚀
Освоение корутин может стать поворотным моментом в вашей карьере Python-разработчика. На курсе Обучение Python-разработке от Skypro вы не только изучите теорию асинхронного программирования, но и создадите реальные высоконагруженные проекты под руководством действующих разработчиков. Наши студенты увеличивают производительность своих приложений в 3-5 раз после применения корутин в проектах. Инвестируйте в навыки, которые делают вас ценным специалистом!
Корутины в Python: основы асинхронного программирования
Корутины в Python — это функции, способные приостанавливать свое выполнение, возвращать управление вызывающему коду, а затем продолжать работу с того места, где остановились. По сути, это альтернативная модель выполнения кода, предоставляющая элегантное решение для задач, требующих конкурентного выполнения без сложностей многопоточности.
Вместо того чтобы блокировать выполнение программы на время ожидания завершения операции ввода-вывода (например, чтения из файла или сетевого запроса), корутины позволяют "переключиться" на другие задачи, значительно повышая эффективность использования ресурсов системы.
Алексей Дроздов, Python-архитектор высоконагруженных систем
Несколько лет назад я работал над сервисом, который должен был обрабатывать тысячи запросов к внешнему API ежеминутно. Начальная реализация с использованием синхронного кода справлялась лишь с сотней запросов, после чего сервер просто "ложился". Внедрение корутин изменило ситуацию кардинально — нам удалось достичь показателя в 5000+ запросов в минуту на том же железе без какой-либо многопоточности. Самое удивительное, что для перехода от синхронного кода к асинхронному потребовалось переписать всего около 20% кода, но эффект был колоссальным.
Ключевые концепции, которые необходимо понять для работы с корутинами:
- Асинхронность: выполнение задач без блокирования основного потока
- Event Loop: "сердце" асинхронного кода, отвечающее за координацию выполнения корутин
- Точки ожидания: места в коде, где корутина может приостановиться
- Футуры (Futures) и Задачи (Tasks): объекты для представления асинхронных операций
Прежде чем погрузиться в создание корутин, важно понимать отличие между конкурентностью и параллелизмом. Конкурентность — это структура программы, позволяющая выполнять несколько задач, перекрывая их по времени, что не обязательно означает выполнение их одновременно. Параллелизм же подразумевает фактически одновременное выполнение нескольких задач.
| Аспект | Традиционный синхронный подход | Асинхронный подход с корутинами |
|---|---|---|
| Модель выполнения | Последовательное выполнение операций | Конкурентное выполнение операций |
| Блокировка при I/O | Блокирует весь поток выполнения | Освобождает контроль во время ожидания |
| Масштабируемость | Ограничена, требуются дополнительные потоки/процессы | Высокая, тысячи корутин в одном потоке |
| Расход памяти | Высокий при многопоточности | Низкий, корутины легковесны |
| Сложность кода | Обычно проще для понимания | Требует понимания асинхронной парадигмы |
Корутины в Python тесно связаны с генераторами, но имеют существенные отличия. Фактически, корутины можно рассматривать как эволюцию генераторов, получивших специализированный синтаксис и функциональность для асинхронных операций.

Создание корутин с async def: синтаксис и применение
В современном Python (начиная с версии 3.5) корутины объявляются с помощью ключевого слова async def. Этот синтаксис четко отделяет асинхронные функции от обычных и указывает интерпретатору на особый режим выполнения.
Рассмотрим базовую структуру корутины:
async def my_coroutine():
# асинхронный код
print("Корутина выполняется")
await asyncio.sleep(1) # точка ожидания
print("Корутина продолжает выполнение после ожидания")
return "Результат выполнения корутины"
Ключевые элементы синтаксиса корутин:
- async def: объявление асинхронной функции
- await: оператор, указывающий на точку, где корутина может приостановиться
- asyncio.sleep(): асинхронная версия time.sleep(), не блокирующая выполнение других корутин
- return: возвращает результат выполнения корутины
Запуск корутины не происходит автоматически при её вызове. Вместо этого, вызов корутины создает объект корутины, который должен быть передан в цикл событий (event loop) для выполнения:
import asyncio
async def hello_world():
print("Hello")
await asyncio.sleep(1)
print("World")
return "Выполнено"
# Создаем и запускаем корутину
async def main():
result = await hello_world()
print(f"Результат: {result}")
# Запуск корутины через цикл событий
asyncio.run(main())
Функция asyncio.run() создает новый цикл событий, выполняет в нём указанную корутину и закрывает цикл после завершения. Это рекомендуемый способ запуска корутин в Python 3.7+.
Марина Ковалева, Lead Python-разработчик
В одном из проектов мы столкнулись с необходимостью параллельного скрапинга данных с тысяч веб-страниц. Первая версия использовала ThreadPoolExecutor с 20 рабочими потоками, но даже такое решение занимало около 3 часов на выполнение всего объема работы. После перехода на асинхронный подход с корутинами, мы смогли одновременно обрабатывать до 100 запросов, сократив время выполнения до 40 минут. Самое главное — код стал не только быстрее, но и чище:
PythonСкопировать код# До (с потоками) def fetch_page(url): response = requests.get(url) return parse_data(response.text) with ThreadPoolExecutor(max_workers=20) as executor: results = list(executor.map(fetch_page, urls)) # После (с корутинами) async def fetch_page(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: text = await response.text() return parse_data(text) tasks = [fetch_page(url) for url in urls] results = await asyncio.gather(*tasks)Асинхронный код позволяет гораздо лучше контролировать процесс и более эффективно использовать ресурсы.
Создание корутин не ограничивается простыми примерами. Вы можете использовать все возможности обычных функций, включая параметры, значения по умолчанию, аннотации типов и даже контекстные менеджеры:
async def process_data(data: list, retry_count: int = 3) -> dict:
results = {}
async with aiofiles.open(filename, mode='r') as f:
return await f.read()
async def fetch_user_data(user_id: int):
profile = await fetch_profile(user_id)
posts = await fetch_posts(user_id)
return {"profile": profile, "posts": posts}
Важно понимать, что объект корутины должен быть "развёрнут" с помощью await или передан в специальные функции модуля asyncio, такие как asyncio.create_task() или asyncio.gather(). В противном случае корутина не будет выполнена, и Python выдаст предупреждение.
Практическая работа с async/await в Python-программах
Теория — это хорошо, но только практика с реальными задачами позволяет полностью освоить концепцию корутин. Рассмотрим несколько практических сценариев использования async/await для решения типичных задач разработки.
Начнем с базового примера асинхронной обработки нескольких задач:
import asyncio
import time
async def count_up(name, delay):
"""Корутина, которая считает до 5 с указанной задержкой"""
print(f"Задача {name} начата")
for i in range(1, 6):
await asyncio.sleep(delay) # Асинхронная пауза
print(f"Задача {name}: {i}")
print(f"Задача {name} завершена")
return name
async def main():
start_time = time.time()
# Создаем и запускаем три корутины одновременно
results = await asyncio.gather(
count_up("A", 0.5),
count_up("B", 1),
count_up("C", 1.5)
)
end_time = time.time()
print(f"\nВсе задачи выполнены за {end_time – start_time:.2f} секунд")
print(f"Результаты: {results}")
asyncio.run(main())
В этом примере функция asyncio.gather() позволяет запустить несколько корутин одновременно и дождаться их завершения. Если бы мы выполняли эти задачи последовательно, общее время выполнения составило бы примерно 15 секунд (0.55 + 15 + 1.5*5). Однако с асинхронным выполнением время составит около 7.5 секунд (максимальная задержка 1.5 секунды, умноженная на 5 итераций).
Теперь рассмотрим более реалистичный пример: асинхронный HTTP-клиент для одновременного получения данных с нескольких URL:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Получение содержимого URL"""
start = time.time()
async with session.get(url) as response:
content = await response.text()
elapsed = time.time() – start
print(f"{url} – {len(content)} байт получено за {elapsed:.2f} сек")
return len(content)
async def fetch_all(urls):
"""Получение данных со всех URL одновременно"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://python.org",
"https://github.com",
"https://stackoverflow.com",
"https://news.ycombinator.com",
"https://www.wikipedia.org"
]
start_time = time.time()
total_bytes = sum(await fetch_all(urls))
elapsed = time.time() – start_time
print(f"\nПолучено {total_bytes} байт за {elapsed:.2f} секунд")
asyncio.run(main())
Благодаря асинхронному подходу, все HTTP-запросы выполняются параллельно, и общее время выполнения определяется самым медленным запросом, а не суммой времени всех запросов.
Важно учитывать несколько практических аспектов при работе с async/await:
| Рекомендация | Практический совет | Пример кода |
|---|---|---|
| Используйте асинхронные версии библиотек | Вместо requests используйте aiohttp; вместо pymongo — motor | async with aiohttp.ClientSession() as session: |
| Избегайте блокирующих операций | Не используйте time.sleep(), file.read() в корутинах | await asyncio.sleep(1) # Вместо time.sleep(1) |
| Правильно обрабатывайте исключения | Используйте try/except для каждой асинхронной задачи | try: await coro() except Exception: pass |
| Используйте asyncio.gather() с return_exceptions=True | Позволяет получить все результаты, даже если часть задач завершилась с ошибкой | results = await asyncio.gather(*tasks, return_exceptions=True) |
| Правильно отменяйте задачи | Всегда проверяйте статус задачи перед использованием результата | task.cancel(); await asyncio.sleep(0) |
Одна из наиболее распространенных ошибок при работе с async/await — блокирование цикла событий. Вот пример, как не следует писать асинхронный код:
import asyncio
import time
async def bad_example():
print("Начало выполнения")
# Блокирующая операция в корутине! ❌
time.sleep(2) # Это блокирует весь цикл событий
print("После паузы")
return "Готово"
async def good_example():
print("Начало выполнения")
# Правильно: использование асинхронной версии sleep ✅
await asyncio.sleep(2) # Позволяет другим корутинам выполняться
print("После паузы")
return "Готово"
Разница между этими двумя подходами становится очевидной, когда у вас есть несколько корутин, работающих одновременно: в первом случае все они будут блокироваться на время выполнения time.sleep(), во втором — продолжат выполнение по очереди. 🔄
Управление потоком выполнения асинхронных функций
Эффективное управление потоком выполнения — ключевой аспект работы с корутинами. В этом разделе мы рассмотрим инструменты и техники, которые помогут вам контролировать выполнение асинхронных задач, управлять их жизненным циклом и обрабатывать ошибки.
Начнем с создания и управления задачами (Tasks). Task — это объект, представляющий корутину в цикле событий. Создание задачи позволяет запустить корутину "в фоне", не дожидаясь её завершения немедленно:
import asyncio
async def background_task(name):
print(f"Задача {name} запущена")
for i in range(5):
print(f"Задача {name}: шаг {i}")
await asyncio.sleep(1)
print(f"Задача {name} завершена")
return f"Результат {name}"
async def main():
# Создание задачи — корутина начнёт выполняться немедленно
task1 = asyncio.create_task(background_task("A"))
# Выполняем другие действия, пока задача работает в фоне
print("Выполняем другие действия...")
await asyncio.sleep(2)
# Проверяем, завершилась ли задача
if not task1.done():
print("Задача A всё ещё выполняется")
# Ожидаем завершения задачи и получаем результат
result = await task1
print(f"Полученный результат: {result}")
asyncio.run(main())
Для более сложных сценариев управления задачами можно использовать следующие методы и функции:
- asyncio.wait_for() — ожидание корутины с таймаутом
- asyncio.wait() — ожидание завершения нескольких задач
- asyncio.as_completed() — итерация по задачам по мере их завершения
- task.cancel() — отмена выполнения задачи
- asyncio.shield() — защита корутины от отмены
Рассмотрим пример с таймаутом и отменой задач:
import asyncio
async def slow_operation():
print("Начало медленной операции")
await asyncio.sleep(5) # Имитация длительной операции
print("Конец медленной операции")
return "Готово!"
async def main():
try:
# Ждем выполнения корутины максимум 2 секунды
result = await asyncio.wait_for(slow_operation(), timeout=2)
print(f"Результат: {result}")
except asyncio.TimeoutError:
print("Операция заняла слишком много времени, прервана!")
# Альтернативный подход: создать и отменить задачу
task = asyncio.create_task(slow_operation())
await asyncio.sleep(2) # Подождем немного
# Отмена задачи
task.cancel()
try:
await task # Попытка дождаться отменённой задачи
except asyncio.CancelledError:
print("Задача была отменена")
asyncio.run(main())
Для обработки нескольких задач одновременно, особенно когда нужно получить результаты по мере их готовности, используйте asyncio.as_completed():
import asyncio
import random
async def random_sleep(id):
sleep_time = random.uniform(0.5, 3)
print(f"Задача {id} будет выполняться {sleep_time:.2f} сек.")
await asyncio.sleep(sleep_time)
return f"Результат {id} после {sleep_time:.2f} сек."
async def main():
# Создаем несколько задач с разным временем выполнения
tasks = [random_sleep(i) for i in range(5)]
# Обрабатываем результаты по мере их готовности
for future in asyncio.as_completed(tasks):
result = await future
print(f"Получен: {result}")
asyncio.run(main())
Эффективное управление потоком асинхронного выполнения также включает обработку ошибок. В асинхронном коде необработанные исключения могут привести к "потерянным" задачам и сложно отслеживаемым ошибкам. Рассмотрим основные подходы к обработке исключений в корутинах:
import asyncio
async def risky_operation(id):
print(f"Операция {id} начата")
await asyncio.sleep(1)
if id % 2 == 0:
raise ValueError(f"Ошибка в операции {id}")
return f"Успех операции {id}"
async def main():
# Подход 1: try/except в каждой корутине
tasks1 = []
for i in range(5):
try:
result = await risky_operation(i)
print(result)
except Exception as e:
print(f"Поймано исключение: {e}")
# Подход 2: gather с return_exceptions=True
print("\nПодход с gather:")
results = await asyncio.gather(
*[risky_operation(i) for i in range(5)],
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Операция {i} завершилась ошибкой: {result}")
else:
print(f"Операция {i}: {result}")
asyncio.run(main())
Контроль конкурентности — еще один важный аспект управления потоком выполнения. В некоторых случаях вам может потребоваться ограничить количество одновременно выполняемых корутин, чтобы не перегрузить систему или внешние ресурсы. Для этого можно использовать семафоры:
import asyncio
import aiohttp
async def fetch_with_semaphore(url, semaphore):
async with semaphore: # Ограничиваем количество одновременных запросов
print(f"Запрос к {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
# Создаем семафор, ограничивающий выполнение до 3 корутин одновременно
semaphore = asyncio.Semaphore(3)
urls = [
"https://python.org",
"https://github.com",
"https://stackoverflow.com",
"https://news.ycombinator.com",
"https://www.wikipedia.org",
"https://reddit.com",
"https://dev.to",
"https://medium.com"
]
# Создаем задачи с семафором
tasks = [fetch_with_semaphore(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Получено {len(results)} ответов")
asyncio.run(main())
Оптимизация I/O-операций с помощью корутин Python
Операции ввода-вывода (I/O) часто становятся узким местом в производительности приложений. Будь то чтение/запись файлов, сетевые запросы или взаимодействие с базами данных — эти операции обычно на порядки медленнее, чем вычисления в памяти. Асинхронное программирование с использованием корутин может радикально улучшить эффективность таких операций. 🚀
Рассмотрим основные сценарии оптимизации I/O-операций с помощью корутин:
- Асинхронная работа с HTTP-запросами
- Оптимизация файловых операций
- Асинхронное взаимодействие с базами данных
- Комбинированные I/O-операции
Начнем с оптимизации HTTP-запросов, так как это одно из наиболее распространенных применений асинхронности. Сравним синхронный и асинхронный подходы:
# Синхронный подход с requests
import requests
import time
def fetch_urls_sync(urls):
start_time = time.time()
results = []
for url in urls:
response = requests.get(url)
results.append(response.text)
elapsed = time.time() – start_time
return results, elapsed
# Асинхронный подход с aiohttp
import asyncio
import aiohttp
async def fetch_urls_async(urls):
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_url(session, url))
results = await asyncio.gather(*tasks)
elapsed = time.time() – start_time
return results, elapsed
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
# Сравнение производительности
urls = ["https://example.com" for _ in range(10)]
# Синхронный вариант
sync_results, sync_time = fetch_urls_sync(urls)
print(f"Синхронно: {sync_time:.2f} сек.")
# Асинхронный вариант
async def run_comparison():
async_results, async_time = await fetch_urls_async(urls)
print(f"Асинхронно: {async_time:.2f} сек.")
print(f"Ускорение: {sync_time/async_time:.2f}x")
asyncio.run(run_comparison())
При увеличении количества запросов разница становится еще более существенной. В реальных сценариях асинхронный подход может быть в 5-20 раз быстрее синхронного.
Теперь рассмотрим оптимизацию файловых операций. В Python 3.7+ появилась библиотека aiofiles, которая предоставляет асинхронные версии стандартных файловых операций:
import asyncio
import aiofiles
import os
import time
async def read_file_async(filename):
async with aiofiles.open(filename, mode='r') as f:
return await f.read()
async def write_file_async(filename, content):
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def process_files_async(filenames):
start_time = time.time()
# Создаем задачи для чтения файлов
read_tasks = [read_file_async(filename) for filename in filenames]
contents = await asyncio.gather(*read_tasks)
# Обрабатываем содержимое (например, преобразуем в верхний регистр)
processed = [content.upper() for content in contents]
# Записываем результаты в новые файлы
write_tasks = [
write_file_async(f"processed_{os.path.basename(filename)}", processed_content)
for filename, processed_content in zip(filenames, processed)
]
await asyncio.gather(*write_tasks)
elapsed = time.time() – start_time
return elapsed
# Тестирование на нескольких файлах
async def main():
# Создаем тестовые файлы
filenames = [f"test_file_{i}.txt" for i in range(100)]
for filename in filenames:
with open(filename, 'w') as f:
f.write(f"Тестовое содержимое файла {filename}\n" * 1000)
elapsed = await process_files_async(filenames)
print(f"Асинхронная обработка 100 файлов: {elapsed:.2f} сек.")
# Удаляем тестовые файлы
for filename in filenames:
os.remove(filename)
if os.path.exists(f"processed_{filename}"):
os.remove(f"processed_{filename}")
asyncio.run(main())
Для работы с базами данных также существуют асинхронные библиотеки. Например, asyncpg для PostgreSQL, aiomysql для MySQL и motor для MongoDB. Вот пример использования asyncpg:
import asyncio
import asyncpg
async def fetch_users(min_age):
# Подключаемся к базе данных
conn = await asyncpg.connect(
user='postgres',
password='password',
database='testdb',
host='localhost'
)
# Выполняем запрос
query = "SELECT id, name, age FROM users WHERE age > $1 ORDER BY age"
rows = await conn.fetch(query, min_age)
# Закрываем соединение
await conn.close()
return rows
async def main():
# Получаем пользователей старше 30 лет
users = await fetch_users(30)
# Выводим результаты
for user in users:
print(f"ID: {user['id']}, Имя: {user['name']}, Возраст: {user['age']}")
asyncio.run(main())
Настоящая мощь корутин проявляется при комбинировании различных I/O-операций. Например, вы можете одновременно читать данные из базы, отправлять HTTP-запросы и записывать результаты в файлы:
| Тип операции | Синхронная библиотека | Асинхронная альтернатива | Типичное ускорение |
|---|---|---|---|
| HTTP-запросы | requests | aiohttp, httpx | 5-20x |
| Файловые операции | open(), os | aiofiles | 2-5x |
| PostgreSQL | psycopg2 | asyncpg | 3-10x |
| MongoDB | pymongo | motor | 2-8x |
| Redis | redis-py | aioredis | 4-12x |
| WebSocket | websocket-client | websockets | 3-15x |
Рассмотрим практический пример комплексной оптимизации, который включает несколько типов I/O-операций:
import asyncio
import aiohttp
import aiofiles
import asyncpg
import json
from datetime import datetime
async def fetch_api_data(session, user_id):
"""Получение данных из внешнего API"""
url = f"https://api.example.com/users/{user_id}"
async with session.get(url) as response:
return await response.json()
async def save_to_db(pool, user_data):
"""Сохранение данных в базу"""
query = """
INSERT INTO user_activities (user_id, activity, timestamp)
VALUES ($1, $2, $3)
RETURNING id
"""
return await pool.execute(
query,
user_data['id'],
user_data['activity'],
datetime.now()
)
async def log_to_file(filename, message):
"""Запись логов в файл"""
async with aiofiles.open(filename, 'a') as f:
await f.write(f"{datetime.now()} – {message}\n")
async def process_user(session, pool, user_id):
"""Комплексная обработка данных пользователя"""
try:
# Получаем данные из API
user_data = await fetch_api_data(session, user_id)
# Сохраняем в базу данных
db_result = await save_to_db(pool, user_data)
# Логируем результат
await log_to_file(
'processing.log',
f"Пользователь {user_id} обработан, DB ID: {db_result}"
)
return user_id, True
except Exception as e:
# Логируем ошибку
await log_to_file('errors.log', f"Ошибка обработки {user_id}: {str(e)}")
return user_id, False
async def main():
# Список ID пользователей для обработки
user_ids = list(range(1, 1001))
# Устанавливаем соединение с базой данных
pool = await asyncpg.create_pool(
user='postgres',
password='password',
database='userdb',
host='localhost'
)
async with aiohttp.ClientSession() as session:
# Ограничиваем количество одновременных задач до 50
semaphore = asyncio.Semaphore(50)
async def bounded_process(user_id):
async with semaphore:
return await process_user(session, pool, user_id)
# Запускаем все задачи
tasks = [bounded_process(user_id) for user_id in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Закрываем пул соединений с БД
await pool.close()
# Анализируем результаты
successes = sum(1 for r in results if isinstance(r, tuple) and r[1])
errors = len(results) – successes
print(f"Обработано {len(results)} пользователей")
print(f"Успешно: {successes}, Ошибок: {errors}")
if __name__ == "__main__":
asyncio.run(main())
Данный пример демонстрирует комплексный подход к оптимизации I/O-операций, включая управление конкурентностью с помощью семафора, обработку ошибок и координацию разных типов асинхронных операций.
При оптимизации I/O-операций с помощью корутин помните о следующих принципах:
- Используйте пулы соединений для баз данных вместо открытия отдельных соединений для каждой операции
- Контролируйте конкурентность с помощью семафоров, особенно при работе с внешними сервисами
- Группируйте подобные операции для лучшей производительности (например, все операции чтения БД, затем все HTTP-запросы)
- Измеряйте производительность для поиска узких мест и оптимизации наиболее проблемных операций
- Правильно обрабатывайте ошибки, особенно при параллельном выполнении множества операций
Корутины в Python — это мощный инструмент, значительно упрощающий асинхронное программирование и позволяющий писать высокопроизводительный код без сложностей многопоточности. Они особенно эффективны при работе с I/O-операциями, позволяя вашим приложениям обрабатывать тысячи конкурентных задач в одном потоке. Хотя освоение асинхронного программирования требует определенного сдвига мышления, результаты стоят затраченных усилий. Начните с малого, постепенно интегрируя корутины в нужные части вашего кода, и вскоре вы заметите, как приложения становятся быстрее, отзывчивее и эффективнее используют ресурсы системы.