События в Python: от базовых концепций до продвинутых паттернов

Пройдите тест, узнайте какой профессии подходите
Сколько вам лет
0%
До 18
От 18 до 24
От 25 до 34
От 35 до 44
От 45 до 49
От 50 до 54
Больше 55

Для кого эта статья:

  • начинающие и опытные разработчики на Python
  • специалисты по разработке интерактивных приложений и графических интерфейсов
  • студенты, желающие улучшить свои навыки программирования и понять работу с событиями в Python

    События в Python — не просто абстрактное понятие, а мощный механизм, с которым сталкивается каждый разработчик, создающий интерактивные приложения. Неважно, разрабатываете ли вы графический интерфейс, где необходимо реагировать на клики мыши, или создаёте асинхронный сервер, который должен обрабатывать множество запросов одновременно — понимание событийной модели становится ключом к созданию эффективного, отзывчивого и хорошо структурированного кода. Эта статья проведёт вас через все этапы работы с событиями в Python: от базовых концепций до продвинутых паттернов проектирования. 🐍

Хотите не просто читать о событиях в Python, а применять эти знания на практике, создавая реальные проекты под руководством экспертов? Обучение Python-разработке от Skypro — это погружение в реальные задачи, где вы научитесь создавать отзывчивые приложения с использованием событийных моделей под наблюдением опытных менторов. От GUI-приложений до асинхронных серверов — все инструменты событийно-ориентированного программирования станут частью вашего профессионального арсенала!

Основы событийно-ориентированного программирования в Python

Событийно-ориентированное программирование (Event-Driven Programming) — это парадигма, в которой выполнение программы определяется событиями: действиями пользователя, сообщениями от других программ, сигналами датчиков и т.д. В отличие от последовательного выполнения, где код выполняется сверху вниз, в событийной модели программа «ожидает» наступления определённых событий и реагирует на них. 🔄

Основные компоненты событийной модели в Python:

  • События (Events) — сигналы о том, что что-то произошло (клик мыши, получение данных)
  • Обработчики событий (Event handlers) — функции, которые выполняются при наступлении события
  • Цикл обработки событий (Event loop) — бесконечный цикл, отслеживающий события и вызывающий обработчики
  • Источники событий (Event sources) — объекты, генерирующие события

Python не имеет встроенного универсального механизма для работы с событиями, но предоставляет несколько способов их реализации:

Подход Описание Типичное применение
Функции обратного вызова (Callbacks) Функции, передаваемые как аргументы, которые вызываются при определённых условиях Простые обработчики событий, асинхронные функции
Паттерн Observer Объекты-наблюдатели подписываются на изменения объекта-субъекта Системы с множеством взаимосвязанных компонентов
Библиотеки GUI Специализированные библиотеки с встроенной событийной моделью Приложения с графическим интерфейсом (PyQt, Tkinter)
Asyncio Асинхронное программирование с событийным циклом Сетевые приложения, обработка I/O-операций

Простейшая реализация событийной модели в Python может выглядеть так:

Python
Скопировать код
class EventEmitter:
def __init__(self):
self.callbacks = {}

def on(self, event_name, callback):
"""Подписка на событие"""
if event_name not in self.callbacks:
self.callbacks[event_name] = []
self.callbacks[event_name].append(callback)

def emit(self, event_name, *args, **kwargs):
"""Генерация события"""
if event_name in self.callbacks:
for callback in self.callbacks[event_name]:
callback(*args, **kwargs)

# Использование
emitter = EventEmitter()

def message_handler(message):
print(f"Получено сообщение: {message}")

emitter.on('message', message_handler)
emitter.emit('message', "Привет, мир!") # Вывод: Получено сообщение: Привет, мир!

Этот простой пример демонстрирует основную механику событийной модели: регистрация обработчиков событий и их вызов при наступлении соответствующих событий. В реальных проектах обычно используются более сложные и оптимизированные реализации этой концепции.

Александр, Python-разработчик старший

Когда я только начинал работать с событийным программированием в Python, я часто путался в том, как структурировать код. Один из моих первых проектов представлял собой мониторинг системы, который должен был реагировать на различные события: высокую загрузку CPU, низкое пространство на диске и сетевые сбои.

Изначально я создал монолитный скрипт, проверяющий все параметры в бесконечном цикле. Это приводило к проблемам: код был нечитаемым, трудно расширяемым, и некоторые проверки блокировали выполнение других.

Переход к событийной модели кардинально изменил ситуацию. Я создал центральный EventEmitter, где каждый мониторинговый модуль генерировал собственные события:

Python
Скопировать код
# Модуль мониторинга CPU
def monitor_cpu():
while True:
usage = get_cpu_usage()
if usage > 90:
event_emitter.emit('high_cpu', usage)
time.sleep(5)

# Обработчик события
def handle_high_cpu(usage):
notify_admin(f"Критическая загрузка CPU: {usage}%")
restart_some_services()

event_emitter.on('high_cpu', handle_high_cpu)

Каждый модуль работал независимо, код стал модульным, а добавление новых типов мониторинга превратилось в тривиальную задачу. Такая архитектура также позволила легко добавлять разные реакции на одно и то же событие без изменения исходного кода мониторов.

Пошаговый план для смены профессии

Работа с событиями в GUI-библиотеках PyQt и Tkinter

GUI-приложения — классический пример событийно-ориентированного программирования. Пользователь кликает кнопку, вводит текст, перетаскивает элемент — все эти действия генерируют события, на которые приложение должно реагировать. Рассмотрим две популярные библиотеки для создания графических интерфейсов в Python: PyQt и Tkinter. 🖥️

PyQt: сигналы и слоты

PyQt использует концепцию сигналов и слотов для обработки событий. Сигналы — это события, генерируемые виджетами, а слоты — функции, которые вызываются при генерации сигналов.

Python
Скопировать код
import sys
from PyQt5.QtWidgets import QApplication, QPushButton, QVBoxLayout, QWidget

class MyApp(QWidget):
def __init__(self):
super().__init__()
self.initUI()

def initUI(self):
layout = QVBoxLayout()

self.button = QPushButton('Нажми меня', self)
self.button.clicked.connect(self.on_button_click) # Подключаем сигнал к слоту

layout.addWidget(self.button)
self.setLayout(layout)
self.setWindowTitle('PyQt События')
self.setGeometry(300, 300, 300, 200)

def on_button_click(self):
print('Кнопка была нажата!')

if __name__ == '__main__':
app = QApplication(sys.argv)
ex = MyApp()
ex.show()
sys.exit(app.exec_())

В этом примере сигнал clicked кнопки соединяется со слотом on_button_click с помощью метода connect. PyQt также позволяет создавать собственные сигналы:

Python
Скопировать код
from PyQt5.QtCore import pyqtSignal

class Counter(QObject):
# Определяем собственный сигнал
value_changed = pyqtSignal(int)

def __init__(self):
super().__init__()
self._value = 0

def increment(self):
self._value += 1
# Излучаем сигнал с новым значением
self.value_changed.emit(self._value)

Tkinter: система привязки событий

Tkinter использует другой подход к обработке событий — систему привязки (binding). Вы привязываете функцию-обработчик к определённому событию виджета.

Python
Скопировать код
import tkinter as tk

def on_button_click():
print("Кнопка была нажата!")

root = tk.Tk()
root.title("Tkinter События")
root.geometry("300x200")

button = tk.Button(root, text="Нажми меня")
button.pack(pady=50)

# Привязываем функцию к событию нажатия левой кнопки мыши
button.bind("<Button-1>", lambda event: on_button_click())

# Альтернативный способ
button.config(command=on_button_click)

root.mainloop()

Tkinter предлагает множество типов событий, которые можно отслеживать:

Событие Описание Пример привязки
<Button-1> Нажатие левой кнопки мыши widget.bind("<Button-1>", handler)
<ButtonRelease-1> Отпускание левой кнопки мыши widget.bind("<ButtonRelease-1>", handler)
<Motion> Перемещение курсора мыши widget.bind("<Motion>", handler)
<KeyPress> Нажатие клавиши на клавиатуре widget.bind("<KeyPress>", handler)
<Return> Нажатие клавиши Enter widget.bind("<Return>", handler)
<Configure> Изменение размера или положения виджета widget.bind("<Configure>", handler)

Сравнение подходов PyQt и Tkinter

Каждая библиотека имеет свои особенности при работе с событиями:

  • PyQt: использует более структурированный подход с сигналами и слотами, что делает код более читаемым при сложной логике обработки событий
  • Tkinter: предлагает более простую систему привязки, что может быть удобно для небольших приложений, но менее масштабируемо
  • Типизация: PyQt сигналы могут определять типы передаваемых данных, что помогает избегать ошибок
  • Доступность: Tkinter входит в стандартную библиотеку Python, PyQt требует отдельной установки

Выбор библиотеки зависит от конкретных требований проекта и личных предпочтений разработчика. Для небольших приложений Tkinter может быть достаточно, в то время как PyQt предлагает более мощный инструментарий для сложных интерфейсов.

Асинхронные события в Python: библиотека asyncio

Асинхронное программирование — ещё одна область, где события играют ключевую роль. Библиотека asyncio в Python предоставляет инфраструктуру для написания однопоточного конкурентного кода с использованием синтаксиса async/await. В центре этой библиотеки находится событийный цикл (event loop), который управляет выполнением асинхронных задач и обработкой I/O-событий. ⏱️

Основные компоненты работы с событиями в asyncio:

  • Event Loop (Цикл событий) — центральный исполнительный механизм, который управляет и распределяет выполнение различных задач
  • Coroutines (Корутины) — функции, которые могут приостанавливать своё выполнение, позволяя циклу событий выполнять другие задачи
  • Futures и Tasks — объекты, представляющие результат выполнения операции, которая ещё не завершена
  • Event Callbacks — функции, вызываемые при наступлении определённых событий (таймауты, I/O-готовность)

Пример использования asyncio для обработки событий:

Python
Скопировать код
import asyncio

async def handle_client(reader, writer):
"""Обработчик подключения клиента"""
addr = writer.get_extra_info('peername')
print(f"Подключен клиент с адреса {addr}")

while True:
# Ожидаем события получения данных
data = await reader.read(100)
if not data:
break

message = data.decode()
print(f"Получено сообщение от {addr}: {message}")

# Отправляем ответ
response = f"Эхо: {message}"
writer.write(response.encode())
await writer.drain()

print(f"Клиент {addr} отключился")
writer.close()

async def main():
# Создаём сервер, который будет принимать подключения
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)

addr = server.sockets[0].getsockname()
print(f"Сервер запущен на {addr}")

# Сервер работает до вызова server.close()
async with server:
await server.serve_forever()

if __name__ == "__main__":
asyncio.run(main())

В этом примере мы создаём асинхронный эхо-сервер. Функция handle_client вызывается каждый раз, когда происходит событие подключения нового клиента. Внутри этой функции мы асинхронно ожидаем события получения данных от клиента с помощью await reader.read().

Ключевые аспекты обработки событий в asyncio:

  1. Неблокирующие операции: При вызове await функция приостанавливает своё выполнение, позволяя циклу событий обрабатывать другие задачи.
  2. Событийный цикл: Метод asyncio.run() создаёт новый цикл событий и запускает асинхронную функцию.
  3. Автоматическая диспетчеризация: Когда I/O-операция готова (например, получены данные), asyncio автоматически возобновляет выполнение соответствующей корутины.

Asyncio также предоставляет инструменты для работы с различными типами событий:

Python
Скопировать код
import asyncio
import time

async def delayed_message(delay, message):
"""Выводит сообщение после задержки"""
await asyncio.sleep(delay)
print(f"{time.strftime('%H:%M:%S')}: {message}")

async def periodic_task(interval):
"""Выполняет задачу периодически"""
count = 0
while True:
count += 1
print(f"{time.strftime('%H:%M:%S')}: Периодическое событие #{count}")
await asyncio.sleep(interval)

async def wait_for_external_event(future):
"""Ожидает внешнего события (заполнения future)"""
print(f"{time.strftime('%H:%M:%S')}: Ожидание события...")
result = await future
print(f"{time.strftime('%H:%M:%S')}: Событие произошло! Результат: {result}")

async def simulate_external_event(future, delay):
"""Имитирует внешнее событие, устанавливая результат future"""
await asyncio.sleep(delay)
future.set_result("Данные от внешнего источника")

async def main():
# Создаём future для имитации внешнего события
event_future = asyncio.Future()

# Планируем различные задачи
tasks = [
asyncio.create_task(delayed_message(2, "Сообщение с задержкой")),
asyncio.create_task(periodic_task(3)),
asyncio.create_task(wait_for_external_event(event_future)),
asyncio.create_task(simulate_external_event(event_future, 5))
]

# Запускаем все задачи и завершаемся через 10 секунд
await asyncio.sleep(10)

# Отменяем все задачи при выходе
for task in tasks:
task.cancel()

if __name__ == "__main__":
asyncio.run(main())

В этом примере мы демонстрируем различные типы асинхронных событий:

  • Отложенные события (delayed_message)
  • Периодические события (periodic_task)
  • Ожидание внешних событий (waitforexternal_event)
  • Генерация событий (simulateexternalevent)

Asyncio особенно полезен для приложений, которые должны обрабатывать множество параллельных операций ввода-вывода, таких как:

  • Веб-серверы и клиенты
  • Микросервисные архитектуры
  • Обработка сетевого трафика
  • Системы реального времени
  • Многопользовательские игры и чаты

Создание пользовательских событий с паттерном Observer

Паттерн Observer (Наблюдатель) — один из фундаментальных паттернов проектирования, который позволяет одним объектам (наблюдателям) следить за изменениями состояния других объектов (субъектов) и автоматически получать уведомления. Этот паттерн идеально подходит для реализации событийной модели в Python. 👀

Основные компоненты паттерна Observer:

  • Subject (Субъект) — объект, содержащий состояние и управляющий списком наблюдателей
  • Observer (Наблюдатель) — интерфейс для классов, которые должны быть уведомлены об изменениях в субъекте
  • ConcreteSubject — конкретная реализация субъекта, который генерирует события
  • ConcreteObserver — конкретная реализация наблюдателя, который реагирует на события

Реализация паттерна Observer в Python:

Python
Скопировать код
from abc import ABC, abstractmethod
from typing import List, Any

# Интерфейс Observer
class Observer(ABC):
@abstractmethod
def update(self, subject: 'Subject', *args, **kwargs) -> None:
"""Метод, вызываемый при обновлении субъекта"""
pass

# Класс Subject
class Subject:
def __init__(self):
self._observers: List[Observer] = []

def attach(self, observer: Observer) -> None:
"""Подключает наблюдателя к субъекту"""
if observer not in self._observers:
self._observers.append(observer)

def detach(self, observer: Observer) -> None:
"""Отключает наблюдателя от субъекта"""
try:
self._observers.remove(observer)
except ValueError:
pass

def notify(self, *args, **kwargs) -> None:
"""Уведомляет всех наблюдателей о событии"""
for observer in self._observers:
observer.update(self, *args, **kwargs)

# Конкретный субъект – текстовый редактор
class TextEditor(Subject):
def __init__(self):
super().__init__()
self._text = ""

@property
def text(self) -> str:
return self._text

@text.setter
def text(self, value: str) -> None:
self._text = value
# Уведомляем наблюдателей о событии изменения текста
self.notify(event_type="text_changed", new_text=value)

def save(self, filename: str) -> None:
with open(filename, 'w') as file:
file.write(self._text)
# Уведомляем наблюдателей о событии сохранения
self.notify(event_type="saved", filename=filename)

# Конкретные наблюдатели
class LoggingObserver(Observer):
def update(self, subject: Subject, *args, **kwargs) -> None:
event_type = kwargs.get("event_type", "unknown")
if event_type == "text_changed":
print(f"[LOG] Текст был изменен на: {kwargs.get('new_text')}")
elif event_type == "saved":
print(f"[LOG] Текст сохранен в файл: {kwargs.get('filename')}")

class SpellCheckObserver(Observer):
def update(self, subject: Subject, *args, **kwargs) -> None:
event_type = kwargs.get("event_type", "unknown")
if event_type == "text_changed":
text = kwargs.get('new_text', "")
# Простая проверка орфографии
if "нету" in text:
print("[SPELL] Ошибка: используйте 'нет' вместо 'нету'")

# Использование
editor = TextEditor()
logger = LoggingObserver()
spell_checker = SpellCheckObserver()

editor.attach(logger)
editor.attach(spell_checker)

editor.text = "Привет, мир!" # Вызовет событие text_changed
editor.text = "Тут нету ошибок" # Вызовет событие text_changed и сообщение от проверки орфографии
editor.save("document.txt") # Вызовет событие saved

В этом примере мы реализовали простой текстовый редактор, который уведомляет своих наблюдателей о событиях изменения текста и сохранения файла. Наблюдатели (логгер и проверка орфографии) подписываются на эти события и реагируют соответствующим образом.

Преимущества паттерна Observer для обработки событий:

  • Слабая связанность — субъект не знает конкретных классов наблюдателей, что упрощает расширение системы
  • Расширяемость — легко добавлять новые типы наблюдателей без изменения субъекта
  • Многократное использование — один наблюдатель может следить за несколькими субъектами
  • Гибкость — наблюдатели могут быть добавлены или удалены динамически во время выполнения

Мария, Lead Developer

В одном из проектов мы разрабатывали систему мониторинга для финансовой платформы. Ключевым требованием было отслеживание различных типов операций и оповещение соответствующих модулей: логирование, оповещение клиента, обновление балансов и т.д.

Первоначальная архитектура выглядела примитивно — каждая транзакция явно вызывала методы всех зависимых модулей:

Python
Скопировать код
def process_transaction(transaction):
# Обработка транзакции
result = transaction.execute()

# Явные вызовы всех зависимых модулей
log_transaction(transaction, result)
update_balance(transaction.account, result.new_balance)
send_notification(transaction.user, transaction.amount)
update_analytics(transaction.type, transaction.amount)
# ... и так далее

Проблемы начались, когда потребовалось добавить новые типы реакций на транзакции. Каждое добавление требовало модификации всех точек, где вызывалась функция process_transaction, нарушая принцип открытости/закрытости.

Внедрение паттерна Observer полностью изменило архитектуру:

Python
Скопировать код
class TransactionSubject(Subject):
def process(self, transaction):
# Обработка транзакции
result = transaction.execute()

# Единственная точка уведомления
self.notify(
transaction=transaction,
result=result,
event_type="transaction_completed"
)

# Наблюдатели подписываются независимо
transaction_processor = TransactionSubject()
transaction_processor.attach(LoggingObserver())
transaction_processor.attach(BalanceUpdater())
transaction_processor.attach(NotificationSender())
transaction_processor.attach(AnalyticsCollector())

Теперь добавление новых реакций на транзакции (например, проверка на мошенничество) не требовало изменения существующего кода — достаточно было создать нового наблюдателя и подписать его на событие. Когда через несколько месяцев появились новые требования к системе мониторинга, мы смогли внедрить их без единого изменения в ядре системы.

Практические примеры обработки событий в реальных проектах

Теперь, когда мы рассмотрели различные подходы к обработке событий в Python, давайте посмотрим, как эти концепции применяются в реальных проектах. Эти примеры демонстрируют, как событийная модель может упростить разработку сложных систем. 🚀

Пример 1: Система мониторинга серверов

Представьте, что вам нужно разработать систему мониторинга, которая следит за различными метриками серверов (CPU, память, диск) и реагирует на критические состояния:

Python
Скопировать код
import time
import threading
from abc import ABC, abstractmethod
from typing import Dict, List

# Базовый класс для событий
class Event:
def __init__(self, event_type: str, data: Dict):
self.timestamp = time.time()
self.type = event_type
self.data = data

# Интерфейс слушателя событий
class EventListener(ABC):
@abstractmethod
def handle_event(self, event: Event) -> None:
pass

# Диспетчер событий
class EventDispatcher:
def __init__(self):
self._listeners: Dict[str, List[EventListener]] = {}

def add_listener(self, event_type: str, listener: EventListener) -> None:
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(listener)

def remove_listener(self, event_type: str, listener: EventListener) -> None:
if event_type in self._listeners and listener in self._listeners[event_type]:
self._listeners[event_type].remove(listener)

def dispatch(self, event: Event) -> None:
if event.type in self._listeners:
for listener in self._listeners[event.type]:
listener.handle_event(event)

# Система мониторинга
class ServerMonitor:
def __init__(self, event_dispatcher: EventDispatcher):
self.dispatcher = event_dispatcher
self.servers = {}
self.running = False
self._monitor_thread = None

def add_server(self, server_id: str, server_info: Dict) -> None:
self.servers[server_id] = server_info

def start_monitoring(self) -> None:
if self.running:
return

self.running = True
self._monitor_thread = threading.Thread(target=self._monitoring_loop)
self._monitor_thread.daemon = True
self._monitor_thread.start()

def stop_monitoring(self) -> None:
self.running = False
if self._monitor_thread:
self._monitor_thread.join(timeout=1.0)

def _monitoring_loop(self) -> None:
while self.running:
for server_id, server_info in self.servers.items():
# Имитация проверки метрик
cpu_usage = self._get_cpu_usage(server_id)
memory_usage = self._get_memory_usage(server_id)
disk_usage = self._get_disk_usage(server_id)

# Проверка критических условий и генерация событий
if cpu_usage > 90:
self.dispatcher.dispatch(Event("high_cpu", {
"server_id": server_id,
"cpu_usage": cpu_usage
}))

if memory_usage > 85:
self.dispatcher.dispatch(Event("high_memory", {
"server_id": server_id,
"memory_usage": memory_usage
}))

if disk_usage > 95:
self.dispatcher.dispatch(Event("high_disk", {
"server_id": server_id,
"disk_usage": disk_usage
}))

time.sleep(30) # Проверка каждые 30 секунд

# Методы имитации получения метрик (в реальной системе здесь были бы API-вызовы)
def _get_cpu_usage(self, server_id: str) -> float:
# Имитация получения данных о CPU
return 95.0 if server_id == "server1" else 50.0

def _get_memory_usage(self, server_id: str) -> float:
# Имитация получения данных о памяти
return 70.0

def _get_disk_usage(self, server_id: str) -> float:
# Имитация получения данных о диске
return 98.0 if server_id == "server2" else 60.0

# Конкретные обработчики событий
class EmailNotifier(EventListener):
def handle_event(self, event: Event) -> None:
server_id = event.data.get("server_id", "unknown")
print(f"[EMAIL] Отправка уведомления о событии {event.type} для сервера {server_id}")
# В реальном коде здесь был бы вызов API для отправки email

class LoggingListener(EventListener):
def handle_event(self, event: Event) -> None:
print(f"[LOG] {time.ctime(event.timestamp)}: {event.type} – {event.data}")

class AutoScaler(EventListener):
def handle_event(self, event: Event) -> None:
if event.type == "high_cpu":
server_id = event.data.get("server_id")
cpu_usage = event.data.get("cpu_usage")
print(f"[AUTOSCALE] Увеличиваем ресурсы для сервера {server_id} (CPU: {cpu_usage}%)")
# В реальном коде здесь был бы вызов API облачного провайдера

# Использование системы
dispatcher = EventDispatcher()
monitor = ServerMonitor(dispatcher)

# Регистрация слушателей
logger = LoggingListener()
emailer = EmailNotifier()
scaler = AutoScaler()

dispatcher.add_listener("high_cpu", logger)
dispatcher.add_listener("high_cpu", emailer)
dispatcher.add_listener("high_cpu", scaler)
dispatcher.add_listener("high_memory", logger)
dispatcher.add_listener("high_memory", emailer)
dispatcher.add_listener("high_disk", logger)
dispatcher.add_listener("high_disk", emailer)

# Добавление серверов для мониторинга
monitor.add_server("server1", {"name": "Web Server", "ip": "192.168.1.10"})
monitor.add_server("server2", {"name": "Database Server", "ip": "192.168.1.11"})

# Запуск мониторинга
monitor.start_monitoring()

# В реальном приложении здесь был бы бесконечный цикл или другой механизм поддержания работы программы
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
monitor.stop_monitoring()
print("Мониторинг остановлен")

В этом примере мы создали систему мониторинга, которая генерирует события при обнаружении критических состояний серверов. Различные компоненты системы (логгер, система уведомлений, автоматическое масштабирование) подписываются на эти события и реагируют соответствующим образом.

Пример 2: Веб-скрапер с асинхронной обработкой

Рассмотрим пример веб-скрапера, который асинхронно загружает страницы и обрабатывает полученные данные:

Python
Скопировать код
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin

class WebScraper:
def __init__(self, base_url, max_pages=10):
self.base_url = base_url
self.max_pages = max_pages
self.visited_urls = set()
self.queue = asyncio.Queue()
self.event_handlers = {
'page_downloaded': [],
'link_found': [],
'scraping_completed': [],
'error_occurred': []
}

def on(self, event_type, callback):
"""Подписка на событие"""
if event_type in self.event_handlers:
self.event_handlers[event_type].append(callback)
return self

async def emit(self, event_type, data):
"""Генерация события"""
if event_type in self.event_handlers:
for callback in self.event_handlers[event_type]:
if asyncio.iscoroutinefunction(callback):
await callback(data)
else:
callback(data)

async def download_page(self, url):
"""Загрузка страницы по URL"""
if url in self.visited_urls:
return None

self.visited_urls.add(url)

try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
await self.emit('page_downloaded', {'url': url, 'html': html})
return html
else:
await self.emit('error_occurred', {
'url': url, 
'status': response.status,
'message': f"Failed to download {url}: HTTP {response.status}"
})
return None
except Exception as e:
await self.emit('error_occurred', {
'url': url, 
'error': str(e),
'message': f"Error downloading {url}: {str(e)}"
})
return None

async def extract_links(self, url, html):
"""Извлечение ссылок из HTML"""
if not html:
return

soup = BeautifulSoup(html, 'html.parser')
base_domain = urlparse(self.base_url).netloc

for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(url, href)

# Проверяем, что ссылка ведёт на тот же домен
if urlparse(full_url).netloc == base_domain and full_url not in self.visited_urls:
await self.emit('link_found', {'source_url': url, 'found_url': full_url})
await self.queue.put(full_url)

async def process_page(self, url):
"""Обработка одной страницы"""
html = await self.download_page(url)
if html:
await self.extract_links(url, html)

async def run(self):
"""Запуск скрапера"""
# Добавляем начальный URL в очередь
await self.queue.put(self.base_url)

# Создаём воркеры для обработки страниц
workers = []
for _ in range(min(5, self.max_pages)): # Максимум 5 параллельных запросов
workers.append(asyncio.create_task(self.worker()))

# Ждём завершения всех воркеров
await asyncio.gather(*workers)
await self.emit('scraping_completed', {'visited_urls': list(self.visited_urls)})

async def worker(self):
"""Воркер для обработки страниц из очереди"""
while len(self.visited_urls) < self.max_pages:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
await self.process_page(url)
self.queue.task_done()

# Если очередь пуста и все страницы обработаны, выходим
if self.queue.empty() and len(self.visited_urls) > 0:
break

except asyncio.TimeoutError:
# Тайм-аут получения из очереди (очередь может быть пуста)
if self.queue.empty() and len(self.visited_urls) > 0:
break
except Exception as e:
await self.emit('error_occurred', {
'error': str(e),
'message': f"Worker error: {str(e)}"
})

# Пример использования
async def main():
scraper = WebScraper('https://python.org', max_pages=5)

# Регистрируем обработчики событий
scraper.on('page_downloaded', lambda data: print(f"Downloaded: {data['url']} ({len(data['html'])} bytes)"))
scraper.on('link_found', lambda data: print(f"Found link: {data['found_url']} on {data['source_url']}"))
scraper.on('error_occurred', lambda data: print(f"Error: {data['message']}"))

async def on_complete(data):
print(f"Scraping completed. Visited {len(data['visited_urls'])} URLs:")
for url in data['visited_urls']:
print(f" – {url}")

scraper.on('scraping_completed', on_complete)

# Запускаем скрапер
await scraper.run()

# Запуск основной функции
if __name__ == "__main__":
asyncio.run(main())

Этот веб-скрапер использует события для информирования о различных этапах процесса: загрузке страниц, обнаружении ссылок, возникновении ошибок и завершении работы. Такой подход позволяет легко расширять функциональность скрапера, добавляя новые обработчики событий без изменения основного кода.

Сравнение подходов к обработке событий в различных сценариях

Выбор подхода к обработке событий зависит от конкретного сценария использования:

Сценарий Рекомендуемый подход Преимущества Недостатки
Настольное приложение с GUI PyQt/Tkinter Встроенная система событий, высокая производительность, богатый набор виджетов Относительно высокая сложность, требуется изучение API библиотеки
Сетевое приложение с высокой нагрузкой asyncio Высокая производительность, возможность обработки тысяч соединений в одном потоке Необходимость переписывания кода в асинхронном стиле
Обработка системных сигналов Обработчики сигналов (signal) Нативная интеграция с ОС Ограниченные возможности (нельзя выполнять I/O-операции в обработчиках)
Расширяемая архитектура с плагинами Паттерн Observer Слабая связанность, легкость расширения Необходимость ручной реализации механизма событий
IoT-устройства с ограниченными ресурсами Простые callbacks Минимальные накладные расходы, простота реализации Ограниченная масштабируемость, потенциальный "callback hell"

При выборе подхода к обработке событий в своём проекте учитывайте следующие факторы:

  • Масштабируемость — насколько легко добавлять новые типы событий и обработчиков
  • Производительность — насколько эффективно будет работать система при увеличении числа событий
  • Сложность реализации — насколько сложно будет внедрить и поддерживать выбранный подход
  • Интеграция — как выбранный подход будет интегрироваться с другими компонентами системы
  • Тестируемость — насколько легко будет тестировать код, использующий этот подход

Овладев механизмами работы с событиями в Python, вы получаете доступ к созданию гибких, расширяемых и отзывчивых приложений. От графических интерфейсов до высоконагруженных серверов — событийная модель позволяет структурировать код так, чтобы он оставался понятным и поддерживаемым независимо от сложности системы. Каждый из рассмотренных подходов имеет свои преимущества и области применения, но все они следуют одной фундаментальной идее: разделение генерации событий и их обработки. Это разделение делает код более модульным, тестируемым и адаптируемым к изменениям требований, что особенно важно в современной быстро меняющейся среде разработки.

Загрузка...