События в Python: от базовых концепций до продвинутых паттернов
Для кого эта статья:
- начинающие и опытные разработчики на Python
- специалисты по разработке интерактивных приложений и графических интерфейсов
студенты, желающие улучшить свои навыки программирования и понять работу с событиями в Python
События в Python — не просто абстрактное понятие, а мощный механизм, с которым сталкивается каждый разработчик, создающий интерактивные приложения. Неважно, разрабатываете ли вы графический интерфейс, где необходимо реагировать на клики мыши, или создаёте асинхронный сервер, который должен обрабатывать множество запросов одновременно — понимание событийной модели становится ключом к созданию эффективного, отзывчивого и хорошо структурированного кода. Эта статья проведёт вас через все этапы работы с событиями в Python: от базовых концепций до продвинутых паттернов проектирования. 🐍
Хотите не просто читать о событиях в Python, а применять эти знания на практике, создавая реальные проекты под руководством экспертов? Обучение Python-разработке от Skypro — это погружение в реальные задачи, где вы научитесь создавать отзывчивые приложения с использованием событийных моделей под наблюдением опытных менторов. От GUI-приложений до асинхронных серверов — все инструменты событийно-ориентированного программирования станут частью вашего профессионального арсенала!
Основы событийно-ориентированного программирования в Python
Событийно-ориентированное программирование (Event-Driven Programming) — это парадигма, в которой выполнение программы определяется событиями: действиями пользователя, сообщениями от других программ, сигналами датчиков и т.д. В отличие от последовательного выполнения, где код выполняется сверху вниз, в событийной модели программа «ожидает» наступления определённых событий и реагирует на них. 🔄
Основные компоненты событийной модели в Python:
- События (Events) — сигналы о том, что что-то произошло (клик мыши, получение данных)
- Обработчики событий (Event handlers) — функции, которые выполняются при наступлении события
- Цикл обработки событий (Event loop) — бесконечный цикл, отслеживающий события и вызывающий обработчики
- Источники событий (Event sources) — объекты, генерирующие события
Python не имеет встроенного универсального механизма для работы с событиями, но предоставляет несколько способов их реализации:
| Подход | Описание | Типичное применение |
|---|---|---|
| Функции обратного вызова (Callbacks) | Функции, передаваемые как аргументы, которые вызываются при определённых условиях | Простые обработчики событий, асинхронные функции |
| Паттерн Observer | Объекты-наблюдатели подписываются на изменения объекта-субъекта | Системы с множеством взаимосвязанных компонентов |
| Библиотеки GUI | Специализированные библиотеки с встроенной событийной моделью | Приложения с графическим интерфейсом (PyQt, Tkinter) |
| Asyncio | Асинхронное программирование с событийным циклом | Сетевые приложения, обработка I/O-операций |
Простейшая реализация событийной модели в Python может выглядеть так:
class EventEmitter:
def __init__(self):
self.callbacks = {}
def on(self, event_name, callback):
"""Подписка на событие"""
if event_name not in self.callbacks:
self.callbacks[event_name] = []
self.callbacks[event_name].append(callback)
def emit(self, event_name, *args, **kwargs):
"""Генерация события"""
if event_name in self.callbacks:
for callback in self.callbacks[event_name]:
callback(*args, **kwargs)
# Использование
emitter = EventEmitter()
def message_handler(message):
print(f"Получено сообщение: {message}")
emitter.on('message', message_handler)
emitter.emit('message', "Привет, мир!") # Вывод: Получено сообщение: Привет, мир!
Этот простой пример демонстрирует основную механику событийной модели: регистрация обработчиков событий и их вызов при наступлении соответствующих событий. В реальных проектах обычно используются более сложные и оптимизированные реализации этой концепции.
Александр, Python-разработчик старший
Когда я только начинал работать с событийным программированием в Python, я часто путался в том, как структурировать код. Один из моих первых проектов представлял собой мониторинг системы, который должен был реагировать на различные события: высокую загрузку CPU, низкое пространство на диске и сетевые сбои.
Изначально я создал монолитный скрипт, проверяющий все параметры в бесконечном цикле. Это приводило к проблемам: код был нечитаемым, трудно расширяемым, и некоторые проверки блокировали выполнение других.
Переход к событийной модели кардинально изменил ситуацию. Я создал центральный EventEmitter, где каждый мониторинговый модуль генерировал собственные события:
# Модуль мониторинга CPU
def monitor_cpu():
while True:
usage = get_cpu_usage()
if usage > 90:
event_emitter.emit('high_cpu', usage)
time.sleep(5)
# Обработчик события
def handle_high_cpu(usage):
notify_admin(f"Критическая загрузка CPU: {usage}%")
restart_some_services()
event_emitter.on('high_cpu', handle_high_cpu)
Каждый модуль работал независимо, код стал модульным, а добавление новых типов мониторинга превратилось в тривиальную задачу. Такая архитектура также позволила легко добавлять разные реакции на одно и то же событие без изменения исходного кода мониторов.

Работа с событиями в GUI-библиотеках PyQt и Tkinter
GUI-приложения — классический пример событийно-ориентированного программирования. Пользователь кликает кнопку, вводит текст, перетаскивает элемент — все эти действия генерируют события, на которые приложение должно реагировать. Рассмотрим две популярные библиотеки для создания графических интерфейсов в Python: PyQt и Tkinter. 🖥️
PyQt: сигналы и слоты
PyQt использует концепцию сигналов и слотов для обработки событий. Сигналы — это события, генерируемые виджетами, а слоты — функции, которые вызываются при генерации сигналов.
import sys
from PyQt5.QtWidgets import QApplication, QPushButton, QVBoxLayout, QWidget
class MyApp(QWidget):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
layout = QVBoxLayout()
self.button = QPushButton('Нажми меня', self)
self.button.clicked.connect(self.on_button_click) # Подключаем сигнал к слоту
layout.addWidget(self.button)
self.setLayout(layout)
self.setWindowTitle('PyQt События')
self.setGeometry(300, 300, 300, 200)
def on_button_click(self):
print('Кнопка была нажата!')
if __name__ == '__main__':
app = QApplication(sys.argv)
ex = MyApp()
ex.show()
sys.exit(app.exec_())
В этом примере сигнал clicked кнопки соединяется со слотом on_button_click с помощью метода connect. PyQt также позволяет создавать собственные сигналы:
from PyQt5.QtCore import pyqtSignal
class Counter(QObject):
# Определяем собственный сигнал
value_changed = pyqtSignal(int)
def __init__(self):
super().__init__()
self._value = 0
def increment(self):
self._value += 1
# Излучаем сигнал с новым значением
self.value_changed.emit(self._value)
Tkinter: система привязки событий
Tkinter использует другой подход к обработке событий — систему привязки (binding). Вы привязываете функцию-обработчик к определённому событию виджета.
import tkinter as tk
def on_button_click():
print("Кнопка была нажата!")
root = tk.Tk()
root.title("Tkinter События")
root.geometry("300x200")
button = tk.Button(root, text="Нажми меня")
button.pack(pady=50)
# Привязываем функцию к событию нажатия левой кнопки мыши
button.bind("<Button-1>", lambda event: on_button_click())
# Альтернативный способ
button.config(command=on_button_click)
root.mainloop()
Tkinter предлагает множество типов событий, которые можно отслеживать:
| Событие | Описание | Пример привязки |
|---|---|---|
<Button-1> | Нажатие левой кнопки мыши | widget.bind("<Button-1>", handler) |
<ButtonRelease-1> | Отпускание левой кнопки мыши | widget.bind("<ButtonRelease-1>", handler) |
<Motion> | Перемещение курсора мыши | widget.bind("<Motion>", handler) |
<KeyPress> | Нажатие клавиши на клавиатуре | widget.bind("<KeyPress>", handler) |
<Return> | Нажатие клавиши Enter | widget.bind("<Return>", handler) |
<Configure> | Изменение размера или положения виджета | widget.bind("<Configure>", handler) |
Сравнение подходов PyQt и Tkinter
Каждая библиотека имеет свои особенности при работе с событиями:
- PyQt: использует более структурированный подход с сигналами и слотами, что делает код более читаемым при сложной логике обработки событий
- Tkinter: предлагает более простую систему привязки, что может быть удобно для небольших приложений, но менее масштабируемо
- Типизация: PyQt сигналы могут определять типы передаваемых данных, что помогает избегать ошибок
- Доступность: Tkinter входит в стандартную библиотеку Python, PyQt требует отдельной установки
Выбор библиотеки зависит от конкретных требований проекта и личных предпочтений разработчика. Для небольших приложений Tkinter может быть достаточно, в то время как PyQt предлагает более мощный инструментарий для сложных интерфейсов.
Асинхронные события в Python: библиотека asyncio
Асинхронное программирование — ещё одна область, где события играют ключевую роль. Библиотека asyncio в Python предоставляет инфраструктуру для написания однопоточного конкурентного кода с использованием синтаксиса async/await. В центре этой библиотеки находится событийный цикл (event loop), который управляет выполнением асинхронных задач и обработкой I/O-событий. ⏱️
Основные компоненты работы с событиями в asyncio:
- Event Loop (Цикл событий) — центральный исполнительный механизм, который управляет и распределяет выполнение различных задач
- Coroutines (Корутины) — функции, которые могут приостанавливать своё выполнение, позволяя циклу событий выполнять другие задачи
- Futures и Tasks — объекты, представляющие результат выполнения операции, которая ещё не завершена
- Event Callbacks — функции, вызываемые при наступлении определённых событий (таймауты, I/O-готовность)
Пример использования asyncio для обработки событий:
import asyncio
async def handle_client(reader, writer):
"""Обработчик подключения клиента"""
addr = writer.get_extra_info('peername')
print(f"Подключен клиент с адреса {addr}")
while True:
# Ожидаем события получения данных
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Получено сообщение от {addr}: {message}")
# Отправляем ответ
response = f"Эхо: {message}"
writer.write(response.encode())
await writer.drain()
print(f"Клиент {addr} отключился")
writer.close()
async def main():
# Создаём сервер, который будет принимать подключения
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f"Сервер запущен на {addr}")
# Сервер работает до вызова server.close()
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
В этом примере мы создаём асинхронный эхо-сервер. Функция handle_client вызывается каждый раз, когда происходит событие подключения нового клиента. Внутри этой функции мы асинхронно ожидаем события получения данных от клиента с помощью await reader.read().
Ключевые аспекты обработки событий в asyncio:
- Неблокирующие операции: При вызове
awaitфункция приостанавливает своё выполнение, позволяя циклу событий обрабатывать другие задачи. - Событийный цикл: Метод
asyncio.run()создаёт новый цикл событий и запускает асинхронную функцию. - Автоматическая диспетчеризация: Когда I/O-операция готова (например, получены данные), asyncio автоматически возобновляет выполнение соответствующей корутины.
Asyncio также предоставляет инструменты для работы с различными типами событий:
import asyncio
import time
async def delayed_message(delay, message):
"""Выводит сообщение после задержки"""
await asyncio.sleep(delay)
print(f"{time.strftime('%H:%M:%S')}: {message}")
async def periodic_task(interval):
"""Выполняет задачу периодически"""
count = 0
while True:
count += 1
print(f"{time.strftime('%H:%M:%S')}: Периодическое событие #{count}")
await asyncio.sleep(interval)
async def wait_for_external_event(future):
"""Ожидает внешнего события (заполнения future)"""
print(f"{time.strftime('%H:%M:%S')}: Ожидание события...")
result = await future
print(f"{time.strftime('%H:%M:%S')}: Событие произошло! Результат: {result}")
async def simulate_external_event(future, delay):
"""Имитирует внешнее событие, устанавливая результат future"""
await asyncio.sleep(delay)
future.set_result("Данные от внешнего источника")
async def main():
# Создаём future для имитации внешнего события
event_future = asyncio.Future()
# Планируем различные задачи
tasks = [
asyncio.create_task(delayed_message(2, "Сообщение с задержкой")),
asyncio.create_task(periodic_task(3)),
asyncio.create_task(wait_for_external_event(event_future)),
asyncio.create_task(simulate_external_event(event_future, 5))
]
# Запускаем все задачи и завершаемся через 10 секунд
await asyncio.sleep(10)
# Отменяем все задачи при выходе
for task in tasks:
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
В этом примере мы демонстрируем различные типы асинхронных событий:
- Отложенные события (delayed_message)
- Периодические события (periodic_task)
- Ожидание внешних событий (waitforexternal_event)
- Генерация событий (simulateexternalevent)
Asyncio особенно полезен для приложений, которые должны обрабатывать множество параллельных операций ввода-вывода, таких как:
- Веб-серверы и клиенты
- Микросервисные архитектуры
- Обработка сетевого трафика
- Системы реального времени
- Многопользовательские игры и чаты
Создание пользовательских событий с паттерном Observer
Паттерн Observer (Наблюдатель) — один из фундаментальных паттернов проектирования, который позволяет одним объектам (наблюдателям) следить за изменениями состояния других объектов (субъектов) и автоматически получать уведомления. Этот паттерн идеально подходит для реализации событийной модели в Python. 👀
Основные компоненты паттерна Observer:
- Subject (Субъект) — объект, содержащий состояние и управляющий списком наблюдателей
- Observer (Наблюдатель) — интерфейс для классов, которые должны быть уведомлены об изменениях в субъекте
- ConcreteSubject — конкретная реализация субъекта, который генерирует события
- ConcreteObserver — конкретная реализация наблюдателя, который реагирует на события
Реализация паттерна Observer в Python:
from abc import ABC, abstractmethod
from typing import List, Any
# Интерфейс Observer
class Observer(ABC):
@abstractmethod
def update(self, subject: 'Subject', *args, **kwargs) -> None:
"""Метод, вызываемый при обновлении субъекта"""
pass
# Класс Subject
class Subject:
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer) -> None:
"""Подключает наблюдателя к субъекту"""
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
"""Отключает наблюдателя от субъекта"""
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self, *args, **kwargs) -> None:
"""Уведомляет всех наблюдателей о событии"""
for observer in self._observers:
observer.update(self, *args, **kwargs)
# Конкретный субъект – текстовый редактор
class TextEditor(Subject):
def __init__(self):
super().__init__()
self._text = ""
@property
def text(self) -> str:
return self._text
@text.setter
def text(self, value: str) -> None:
self._text = value
# Уведомляем наблюдателей о событии изменения текста
self.notify(event_type="text_changed", new_text=value)
def save(self, filename: str) -> None:
with open(filename, 'w') as file:
file.write(self._text)
# Уведомляем наблюдателей о событии сохранения
self.notify(event_type="saved", filename=filename)
# Конкретные наблюдатели
class LoggingObserver(Observer):
def update(self, subject: Subject, *args, **kwargs) -> None:
event_type = kwargs.get("event_type", "unknown")
if event_type == "text_changed":
print(f"[LOG] Текст был изменен на: {kwargs.get('new_text')}")
elif event_type == "saved":
print(f"[LOG] Текст сохранен в файл: {kwargs.get('filename')}")
class SpellCheckObserver(Observer):
def update(self, subject: Subject, *args, **kwargs) -> None:
event_type = kwargs.get("event_type", "unknown")
if event_type == "text_changed":
text = kwargs.get('new_text', "")
# Простая проверка орфографии
if "нету" in text:
print("[SPELL] Ошибка: используйте 'нет' вместо 'нету'")
# Использование
editor = TextEditor()
logger = LoggingObserver()
spell_checker = SpellCheckObserver()
editor.attach(logger)
editor.attach(spell_checker)
editor.text = "Привет, мир!" # Вызовет событие text_changed
editor.text = "Тут нету ошибок" # Вызовет событие text_changed и сообщение от проверки орфографии
editor.save("document.txt") # Вызовет событие saved
В этом примере мы реализовали простой текстовый редактор, который уведомляет своих наблюдателей о событиях изменения текста и сохранения файла. Наблюдатели (логгер и проверка орфографии) подписываются на эти события и реагируют соответствующим образом.
Преимущества паттерна Observer для обработки событий:
- Слабая связанность — субъект не знает конкретных классов наблюдателей, что упрощает расширение системы
- Расширяемость — легко добавлять новые типы наблюдателей без изменения субъекта
- Многократное использование — один наблюдатель может следить за несколькими субъектами
- Гибкость — наблюдатели могут быть добавлены или удалены динамически во время выполнения
Мария, Lead Developer
В одном из проектов мы разрабатывали систему мониторинга для финансовой платформы. Ключевым требованием было отслеживание различных типов операций и оповещение соответствующих модулей: логирование, оповещение клиента, обновление балансов и т.д.
Первоначальная архитектура выглядела примитивно — каждая транзакция явно вызывала методы всех зависимых модулей:
PythonСкопировать кодdef process_transaction(transaction): # Обработка транзакции result = transaction.execute() # Явные вызовы всех зависимых модулей log_transaction(transaction, result) update_balance(transaction.account, result.new_balance) send_notification(transaction.user, transaction.amount) update_analytics(transaction.type, transaction.amount) # ... и так далееПроблемы начались, когда потребовалось добавить новые типы реакций на транзакции. Каждое добавление требовало модификации всех точек, где вызывалась функция
process_transaction, нарушая принцип открытости/закрытости.Внедрение паттерна Observer полностью изменило архитектуру:
PythonСкопировать кодclass TransactionSubject(Subject): def process(self, transaction): # Обработка транзакции result = transaction.execute() # Единственная точка уведомления self.notify( transaction=transaction, result=result, event_type="transaction_completed" ) # Наблюдатели подписываются независимо transaction_processor = TransactionSubject() transaction_processor.attach(LoggingObserver()) transaction_processor.attach(BalanceUpdater()) transaction_processor.attach(NotificationSender()) transaction_processor.attach(AnalyticsCollector())Теперь добавление новых реакций на транзакции (например, проверка на мошенничество) не требовало изменения существующего кода — достаточно было создать нового наблюдателя и подписать его на событие. Когда через несколько месяцев появились новые требования к системе мониторинга, мы смогли внедрить их без единого изменения в ядре системы.
Практические примеры обработки событий в реальных проектах
Теперь, когда мы рассмотрели различные подходы к обработке событий в Python, давайте посмотрим, как эти концепции применяются в реальных проектах. Эти примеры демонстрируют, как событийная модель может упростить разработку сложных систем. 🚀
Пример 1: Система мониторинга серверов
Представьте, что вам нужно разработать систему мониторинга, которая следит за различными метриками серверов (CPU, память, диск) и реагирует на критические состояния:
import time
import threading
from abc import ABC, abstractmethod
from typing import Dict, List
# Базовый класс для событий
class Event:
def __init__(self, event_type: str, data: Dict):
self.timestamp = time.time()
self.type = event_type
self.data = data
# Интерфейс слушателя событий
class EventListener(ABC):
@abstractmethod
def handle_event(self, event: Event) -> None:
pass
# Диспетчер событий
class EventDispatcher:
def __init__(self):
self._listeners: Dict[str, List[EventListener]] = {}
def add_listener(self, event_type: str, listener: EventListener) -> None:
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(listener)
def remove_listener(self, event_type: str, listener: EventListener) -> None:
if event_type in self._listeners and listener in self._listeners[event_type]:
self._listeners[event_type].remove(listener)
def dispatch(self, event: Event) -> None:
if event.type in self._listeners:
for listener in self._listeners[event.type]:
listener.handle_event(event)
# Система мониторинга
class ServerMonitor:
def __init__(self, event_dispatcher: EventDispatcher):
self.dispatcher = event_dispatcher
self.servers = {}
self.running = False
self._monitor_thread = None
def add_server(self, server_id: str, server_info: Dict) -> None:
self.servers[server_id] = server_info
def start_monitoring(self) -> None:
if self.running:
return
self.running = True
self._monitor_thread = threading.Thread(target=self._monitoring_loop)
self._monitor_thread.daemon = True
self._monitor_thread.start()
def stop_monitoring(self) -> None:
self.running = False
if self._monitor_thread:
self._monitor_thread.join(timeout=1.0)
def _monitoring_loop(self) -> None:
while self.running:
for server_id, server_info in self.servers.items():
# Имитация проверки метрик
cpu_usage = self._get_cpu_usage(server_id)
memory_usage = self._get_memory_usage(server_id)
disk_usage = self._get_disk_usage(server_id)
# Проверка критических условий и генерация событий
if cpu_usage > 90:
self.dispatcher.dispatch(Event("high_cpu", {
"server_id": server_id,
"cpu_usage": cpu_usage
}))
if memory_usage > 85:
self.dispatcher.dispatch(Event("high_memory", {
"server_id": server_id,
"memory_usage": memory_usage
}))
if disk_usage > 95:
self.dispatcher.dispatch(Event("high_disk", {
"server_id": server_id,
"disk_usage": disk_usage
}))
time.sleep(30) # Проверка каждые 30 секунд
# Методы имитации получения метрик (в реальной системе здесь были бы API-вызовы)
def _get_cpu_usage(self, server_id: str) -> float:
# Имитация получения данных о CPU
return 95.0 if server_id == "server1" else 50.0
def _get_memory_usage(self, server_id: str) -> float:
# Имитация получения данных о памяти
return 70.0
def _get_disk_usage(self, server_id: str) -> float:
# Имитация получения данных о диске
return 98.0 if server_id == "server2" else 60.0
# Конкретные обработчики событий
class EmailNotifier(EventListener):
def handle_event(self, event: Event) -> None:
server_id = event.data.get("server_id", "unknown")
print(f"[EMAIL] Отправка уведомления о событии {event.type} для сервера {server_id}")
# В реальном коде здесь был бы вызов API для отправки email
class LoggingListener(EventListener):
def handle_event(self, event: Event) -> None:
print(f"[LOG] {time.ctime(event.timestamp)}: {event.type} – {event.data}")
class AutoScaler(EventListener):
def handle_event(self, event: Event) -> None:
if event.type == "high_cpu":
server_id = event.data.get("server_id")
cpu_usage = event.data.get("cpu_usage")
print(f"[AUTOSCALE] Увеличиваем ресурсы для сервера {server_id} (CPU: {cpu_usage}%)")
# В реальном коде здесь был бы вызов API облачного провайдера
# Использование системы
dispatcher = EventDispatcher()
monitor = ServerMonitor(dispatcher)
# Регистрация слушателей
logger = LoggingListener()
emailer = EmailNotifier()
scaler = AutoScaler()
dispatcher.add_listener("high_cpu", logger)
dispatcher.add_listener("high_cpu", emailer)
dispatcher.add_listener("high_cpu", scaler)
dispatcher.add_listener("high_memory", logger)
dispatcher.add_listener("high_memory", emailer)
dispatcher.add_listener("high_disk", logger)
dispatcher.add_listener("high_disk", emailer)
# Добавление серверов для мониторинга
monitor.add_server("server1", {"name": "Web Server", "ip": "192.168.1.10"})
monitor.add_server("server2", {"name": "Database Server", "ip": "192.168.1.11"})
# Запуск мониторинга
monitor.start_monitoring()
# В реальном приложении здесь был бы бесконечный цикл или другой механизм поддержания работы программы
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
monitor.stop_monitoring()
print("Мониторинг остановлен")
В этом примере мы создали систему мониторинга, которая генерирует события при обнаружении критических состояний серверов. Различные компоненты системы (логгер, система уведомлений, автоматическое масштабирование) подписываются на эти события и реагируют соответствующим образом.
Пример 2: Веб-скрапер с асинхронной обработкой
Рассмотрим пример веб-скрапера, который асинхронно загружает страницы и обрабатывает полученные данные:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
class WebScraper:
def __init__(self, base_url, max_pages=10):
self.base_url = base_url
self.max_pages = max_pages
self.visited_urls = set()
self.queue = asyncio.Queue()
self.event_handlers = {
'page_downloaded': [],
'link_found': [],
'scraping_completed': [],
'error_occurred': []
}
def on(self, event_type, callback):
"""Подписка на событие"""
if event_type in self.event_handlers:
self.event_handlers[event_type].append(callback)
return self
async def emit(self, event_type, data):
"""Генерация события"""
if event_type in self.event_handlers:
for callback in self.event_handlers[event_type]:
if asyncio.iscoroutinefunction(callback):
await callback(data)
else:
callback(data)
async def download_page(self, url):
"""Загрузка страницы по URL"""
if url in self.visited_urls:
return None
self.visited_urls.add(url)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
await self.emit('page_downloaded', {'url': url, 'html': html})
return html
else:
await self.emit('error_occurred', {
'url': url,
'status': response.status,
'message': f"Failed to download {url}: HTTP {response.status}"
})
return None
except Exception as e:
await self.emit('error_occurred', {
'url': url,
'error': str(e),
'message': f"Error downloading {url}: {str(e)}"
})
return None
async def extract_links(self, url, html):
"""Извлечение ссылок из HTML"""
if not html:
return
soup = BeautifulSoup(html, 'html.parser')
base_domain = urlparse(self.base_url).netloc
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(url, href)
# Проверяем, что ссылка ведёт на тот же домен
if urlparse(full_url).netloc == base_domain and full_url not in self.visited_urls:
await self.emit('link_found', {'source_url': url, 'found_url': full_url})
await self.queue.put(full_url)
async def process_page(self, url):
"""Обработка одной страницы"""
html = await self.download_page(url)
if html:
await self.extract_links(url, html)
async def run(self):
"""Запуск скрапера"""
# Добавляем начальный URL в очередь
await self.queue.put(self.base_url)
# Создаём воркеры для обработки страниц
workers = []
for _ in range(min(5, self.max_pages)): # Максимум 5 параллельных запросов
workers.append(asyncio.create_task(self.worker()))
# Ждём завершения всех воркеров
await asyncio.gather(*workers)
await self.emit('scraping_completed', {'visited_urls': list(self.visited_urls)})
async def worker(self):
"""Воркер для обработки страниц из очереди"""
while len(self.visited_urls) < self.max_pages:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
await self.process_page(url)
self.queue.task_done()
# Если очередь пуста и все страницы обработаны, выходим
if self.queue.empty() and len(self.visited_urls) > 0:
break
except asyncio.TimeoutError:
# Тайм-аут получения из очереди (очередь может быть пуста)
if self.queue.empty() and len(self.visited_urls) > 0:
break
except Exception as e:
await self.emit('error_occurred', {
'error': str(e),
'message': f"Worker error: {str(e)}"
})
# Пример использования
async def main():
scraper = WebScraper('https://python.org', max_pages=5)
# Регистрируем обработчики событий
scraper.on('page_downloaded', lambda data: print(f"Downloaded: {data['url']} ({len(data['html'])} bytes)"))
scraper.on('link_found', lambda data: print(f"Found link: {data['found_url']} on {data['source_url']}"))
scraper.on('error_occurred', lambda data: print(f"Error: {data['message']}"))
async def on_complete(data):
print(f"Scraping completed. Visited {len(data['visited_urls'])} URLs:")
for url in data['visited_urls']:
print(f" – {url}")
scraper.on('scraping_completed', on_complete)
# Запускаем скрапер
await scraper.run()
# Запуск основной функции
if __name__ == "__main__":
asyncio.run(main())
Этот веб-скрапер использует события для информирования о различных этапах процесса: загрузке страниц, обнаружении ссылок, возникновении ошибок и завершении работы. Такой подход позволяет легко расширять функциональность скрапера, добавляя новые обработчики событий без изменения основного кода.
Сравнение подходов к обработке событий в различных сценариях
Выбор подхода к обработке событий зависит от конкретного сценария использования:
| Сценарий | Рекомендуемый подход | Преимущества | Недостатки |
|---|---|---|---|
| Настольное приложение с GUI | PyQt/Tkinter | Встроенная система событий, высокая производительность, богатый набор виджетов | Относительно высокая сложность, требуется изучение API библиотеки |
| Сетевое приложение с высокой нагрузкой | asyncio | Высокая производительность, возможность обработки тысяч соединений в одном потоке | Необходимость переписывания кода в асинхронном стиле |
| Обработка системных сигналов | Обработчики сигналов (signal) | Нативная интеграция с ОС | Ограниченные возможности (нельзя выполнять I/O-операции в обработчиках) |
| Расширяемая архитектура с плагинами | Паттерн Observer | Слабая связанность, легкость расширения | Необходимость ручной реализации механизма событий |
| IoT-устройства с ограниченными ресурсами | Простые callbacks | Минимальные накладные расходы, простота реализации | Ограниченная масштабируемость, потенциальный "callback hell" |
При выборе подхода к обработке событий в своём проекте учитывайте следующие факторы:
- Масштабируемость — насколько легко добавлять новые типы событий и обработчиков
- Производительность — насколько эффективно будет работать система при увеличении числа событий
- Сложность реализации — насколько сложно будет внедрить и поддерживать выбранный подход
- Интеграция — как выбранный подход будет интегрироваться с другими компонентами системы
- Тестируемость — насколько легко будет тестировать код, использующий этот подход
Овладев механизмами работы с событиями в Python, вы получаете доступ к созданию гибких, расширяемых и отзывчивых приложений. От графических интерфейсов до высоконагруженных серверов — событийная модель позволяет структурировать код так, чтобы он оставался понятным и поддерживаемым независимо от сложности системы. Каждый из рассмотренных подходов имеет свои преимущества и области применения, но все они следуют одной фундаментальной идее: разделение генерации событий и их обработки. Это разделение делает код более модульным, тестируемым и адаптируемым к изменениям требований, что особенно важно в современной быстро меняющейся среде разработки.