WebSocket в Python: реализация двусторонней связи в реальном времени
Для кого эта статья:
- Python-разработчики, заинтересованные в реализации технологий реального времени
- Специалисты по веб-разработке, желающие улучшить производительность своих приложений
Студенты и начинающие программисты, изучающие асинхронное программирование и сети
WebSocket — это технология, которая взрывным образом меняет характер взаимодействия между клиентом и сервером. В отличие от традиционного HTTP, WebSocket предлагает постоянное соединение, обеспечивающее двустороннюю связь в реальном времени. Именно это делает его незаменимым для чатов, игр, торговых платформ и систем мониторинга. Python с его элегантным синтаксисом и мощными библиотеками предоставляет идеальную почву для реализации WebSocket. В этом руководстве мы разберём протокол от основ до продвинутых техник, вооружив вас практическими примерами для решения реальных задач. 🚀
Хотите стать профессионалом в реализации высоконагруженных систем реального времени с Python? На курсе Обучение Python-разработке от Skypro вы изучите не только WebSocket, но и полный спектр технологий для создания современных веб-приложений. Наши студенты создают рабочие проекты уже с первого месяца обучения, а к выпуску собирают портфолио, достаточное для трудоустройства. Инвестируйте в свои навыки сегодня!
Основы протокола WebSocket и его преимущества в Python
WebSocket представляет собой протокол, обеспечивающий полнодуплексную связь через одно TCP-соединение. Он был стандартизирован IETF как RFC 6455 в 2011 году и с тех пор стал неотъемлемой частью веб-разработки.
Основное отличие WebSocket от HTTP заключается в характере взаимодействия. HTTP работает по модели "запрос-ответ", что значит, что клиент всегда инициирует коммуникацию, а сервер только отвечает. WebSocket, напротив, устанавливает постоянное соединение, позволяя обоим участникам обмениваться сообщениями в любой момент без дополнительных накладных расходов на установление новых соединений.
Антон Сергеев, Senior Backend Developer
Два года назад мы столкнулись с задачей создания торговой платформы, требующей обновления котировок в реальном времени. Изначально использовали Long Polling — клиент отправлял запросы каждые 2 секунды для получения обновлений. Система работала, но была неэффективна: высокая нагрузка на сервер, задержки до 2 секунд и потери соединения при нестабильном интернете.
Переход на WebSocket изменил всё. Нагрузка на сервер снизилась на 70%, задержки уменьшились до миллисекунд, а клиентское приложение стало более отзывчивым. Для Python-разработки выбрали библиотеку websockets с asyncio, что позволило обрабатывать тысячи одновременных подключений на одном сервере.
Преимущества использования WebSocket в Python-приложениях:
- Производительность: отсутствие необходимости устанавливать новое TCP-соединение для каждого сообщения сокращает задержки и накладные расходы.
- Двунаправленная связь: сервер может отправлять данные клиенту без его запроса, что критически важно для приложений реального времени.
- Уменьшение объема данных: заголовки WebSocket значительно меньше HTTP-заголовков, что снижает трафик, особенно при частом обмене сообщениями.
- Естественная интеграция с асинхронностью: Python's asyncio и WebSocket идеально дополняют друг друга для создания масштабируемых приложений.
В экосистеме Python существует несколько библиотек для работы с WebSocket:
| Библиотека | Особенности | Лучшие сценарии использования | |
|---|---|---|---|
| websockets | Асинхронная, основана на asyncio, чистый Python | Высоконагруженные серверы, микросервисы | |
| Flask-SocketIO | Интеграция с Flask, поддержка комнат и пространств имен | Веб-приложения на Flask, требующие WebSocket | |
| Django Channels | Интеграция с Django, ASGI-совместимость | Django-проекты с функциональностью реального времени | |
| Autobahn | Python | Поддержка WebSocket и WAMP протоколов | Распределенные приложения, IoT |
Процесс установления WebSocket-соединения начинается с "рукопожатия" (handshake), при котором клиент отправляет обычный HTTP-запрос с специальными заголовками, указывающими на желание перейти на WebSocket. Если сервер поддерживает протокол, он отвечает с кодом 101 Switching Protocols, и соединение повышается до WebSocket.

Настройка и создание WebSocket сервера на Python
Для создания WebSocket сервера на Python мы будем использовать библиотеку websockets, которая предлагает элегантный API, основанный на асинхронном программировании. Начнем с установки необходимых компонентов:
pip install websockets
Теперь создадим простой эхо-сервер, который будет возвращать клиенту его сообщения:
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(f"Получено сообщение: {message}")
await websocket.send(f"Эхо: {message}")
async def main():
server = await websockets.serve(echo, "localhost", 8765)
print("Сервер запущен на ws://localhost:8765")
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
Рассмотрим ключевые моменты этого кода:
- Функция
echoявляется обработчиком соединений. Она получает два параметра: объект WebSocket и путь запроса. - Конструкция
async for message in websocketпозволяет асинхронно итерировать по сообщениям от клиента. websockets.serveзапускает WebSocket-сервер, связывая его с нашим обработчиком.await server.wait_closed()удерживает сервер запущенным до его явного закрытия.
Для создания более сложного сервера, например, простого чата, нам понадобится хранить активные соединения и рассылать сообщения всем участникам:
import asyncio
import json
import websockets
# Хранилище активных соединений
connected = set()
async def chat(websocket, path):
# Регистрируем нового клиента
connected.add(websocket)
try:
# Приветственное сообщение
await websocket.send(json.dumps({"type": "system", "message": "Добро пожаловать в чат!"}))
# Оповещаем остальных о новом участнике
if len(connected) > 1:
await notify_users(f"Новый пользователь присоединился к чату. Всего участников: {len(connected)}", websocket)
# Обрабатываем сообщения от клиента
async for message in websocket:
data = json.loads(message)
if data["type"] == "message":
await broadcast(data["message"], websocket)
except websockets.exceptions.ConnectionClosed:
print("Соединение закрыто клиентом")
finally:
# Удаляем клиента из списка при отключении
connected.remove(websocket)
if connected: # Если остались другие клиенты
await notify_users(f"Пользователь вышел из чата. Осталось участников: {len(connected)}")
async def notify_users(message, exclude=None):
"""Отправляет системное сообщение всем клиентам"""
system_message = json.dumps({"type": "system", "message": message})
if connected:
await asyncio.gather(
*[ws.send(system_message) for ws in connected if ws != exclude]
)
async def broadcast(message, sender):
"""Рассылает сообщение от одного клиента всем остальным"""
broadcast_message = json.dumps({"type": "chat", "message": message})
if connected:
await asyncio.gather(
*[ws.send(broadcast_message) for ws in connected if ws != sender]
)
async def main():
server = await websockets.serve(chat, "localhost", 8765)
print("Чат-сервер запущен на ws://localhost:8765")
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
Для обеспечения безопасности WebSocket-сервера необходимо реализовать аутентификацию и авторизацию. Вот пример с простой аутентификацией по токену:
import asyncio
import json
import websockets
# Простая имитация базы данных с токенами
USERS = {
"token1": {"username": "user1", "role": "admin"},
"token2": {"username": "user2", "role": "user"}
}
async def authenticate(websocket, path):
# Ждем первое сообщение с токеном
auth_message = await websocket.recv()
try:
data = json.loads(auth_message)
if "token" in data:
token = data["token"]
if token in USERS:
# Аутентификация успешна
user_info = USERS[token]
await websocket.send(json.dumps({
"type": "auth_success",
"username": user_info["username"]
}))
# Запускаем основную логику для аутентифицированного пользователя
await handle_messages(websocket, user_info)
return
except json.JSONDecodeError:
pass
# Если код дошел до сюда, аутентификация не удалась
await websocket.send(json.dumps({
"type": "auth_error",
"message": "Неверный токен аутентификации"
}))
await websocket.close(1008, "authentication failed")
async def handle_messages(websocket, user_info):
try:
async for message in websocket:
# Здесь логика обработки сообщений от аутентифицированного пользователя
print(f"Сообщение от {user_info['username']}: {message}")
await websocket.send(json.dumps({
"type": "response",
"message": f"Вы отправили: {message}"
}))
except websockets.exceptions.ConnectionClosed:
print(f"Соединение с {user_info['username']} закрыто")
async def main():
server = await websockets.serve(authenticate, "localhost", 8765)
print("Защищенный сервер запущен на ws://localhost:8765")
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
| Аспект безопасности | Решение в Python | Примечания |
|---|---|---|
| Аутентификация | JWT-токены, сессии | Токены предпочтительнее для масштабируемых решений |
| Защита от DoS | Rate limiting, ограничение размера сообщений | Используйте библиотеки типа aiolimiter |
| Валидация данных | Schema validation (Pydantic) | Критически важна для предотвращения инъекций |
| Шифрование | WSS (WebSocket Secure) | Использует TLS/SSL, аналогично HTTPS |
Разработка WebSocket клиента: пошаговый код и объяснение
Теперь разработаем WebSocket клиент, который будет взаимодействовать с нашим сервером. Для этого также воспользуемся библиотекой websockets.
Создадим простой клиент, который отправляет сообщения и получает ответы от эхо-сервера:
import asyncio
import websockets
async def send_and_receive():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
# Отправляем сообщение
message = "Привет, WebSocket!"
await websocket.send(message)
print(f"Отправлено: {message}")
# Получаем ответ
response = await websocket.recv()
print(f"Получено: {response}")
async def main():
# Запускаем клиент
await send_and_receive()
if __name__ == "__main__":
asyncio.run(main())
Давайте разберем основные компоненты этого кода:
websockets.connect(uri)– устанавливает соединение с WebSocket-сервером по указанному URI.- Конструкция
async withобеспечивает правильное закрытие соединения даже в случае ошибок. await websocket.send(message)– асинхронно отправляет сообщение на сервер.await websocket.recv()– асинхронно ожидает и получает ответ от сервера.
Для более интерактивного клиента, который может отправлять сообщения из консоли и одновременно получать ответы, нам потребуется использовать несколько асинхронных задач:
import asyncio
import websockets
import json
import sys
async def receive_messages(websocket):
"""Асинхронно получает сообщения от сервера"""
try:
while True:
message = await websocket.recv()
try:
data = json.loads(message)
if data["type"] == "chat":
print(f"\nНовое сообщение: {data['message']}")
elif data["type"] == "system":
print(f"\nСистема: {data['message']}")
else:
print(f"\nПолучено: {message}")
except json.JSONDecodeError:
print(f"\nПолучено: {message}")
# Повторно отображаем приглашение ввода
print("> ", end="", flush=True)
except websockets.exceptions.ConnectionClosed:
print("\nСоединение с сервером закрыто")
except Exception as e:
print(f"\nОшибка при получении сообщений: {e}")
async def send_messages(websocket):
"""Асинхронно отправляет сообщения на сервер"""
try:
# Отправляем токен для аутентификации
await websocket.send(json.dumps({"token": "token1"}))
while True:
print("> ", end="", flush=True)
message = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
message = message.strip()
if message.lower() == "exit":
print("Завершение работы...")
break
await websocket.send(json.dumps({"type": "message", "message": message}))
except websockets.exceptions.ConnectionClosed:
print("\nСоединение с сервером закрыто")
except Exception as e:
print(f"\nОшибка при отправке сообщения: {e}")
async def main():
uri = "ws://localhost:8765"
try:
async with websockets.connect(uri) as websocket:
print(f"Подключено к {uri}")
print("Введите сообщения для отправки. Для выхода введите 'exit'")
# Запускаем обе задачи параллельно
receiver_task = asyncio.create_task(receive_messages(websocket))
sender_task = asyncio.create_task(send_messages(websocket))
# Ждем завершения любой из задач
done, pending = await asyncio.wait(
[receiver_task, sender_task],
return_when=asyncio.FIRST_COMPLETED
)
# Отменяем оставшуюся задачу
for task in pending:
task.cancel()
except websockets.exceptions.ConnectionError:
print(f"Не удалось подключиться к {uri}")
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nКлиент остановлен пользователем")
Этот клиент демонстрирует несколько важных аспектов работы с WebSocket:
- Параллельная обработка: Две асинхронные задачи работают одновременно — одна отвечает за получение сообщений, другая за их отправку.
- Интеграция с IO: Использование
run_in_executorдля неблокирующего ожидания ввода пользователя. - Обработка отключений: Обработка исключений
ConnectionClosedдля корректной реакции на разрыв соединения. - Структурированные сообщения: Использование JSON для передачи структурированных сообщений между клиентом и сервером.
Для работы с защищенными WebSocket-серверами (WSS) необходимо настроить SSL-контекст:
import asyncio
import websockets
import ssl
import pathlib
async def secure_client():
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
ssl_context.load_verify_locations(localhost_pem)
uri = "wss://localhost:8765"
async with websockets.connect(uri, ssl=ssl_context) as websocket:
await websocket.send("Безопасное соединение установлено!")
response = await websocket.recv()
print(f"Получено: {response}")
async def main():
await secure_client()
if __name__ == "__main__":
asyncio.run(main())
Мария Полякова, Lead Full Stack Developer
Работая над проектом для крупного медиахолдинга, мы столкнулись с проблемой модерации комментариев в режиме реального времени. Традиционное решение с периодической проверкой новых комментариев через REST API приводило к задержкам в отображении одобренных комментариев и создавало дополнительную нагрузку на сервер.
Мы реализовали систему на WebSocket, которая немедленно уведомляла модераторов о новых комментариях и мгновенно отображала одобренные комментарии для всех пользователей. В Python использовали комбинацию Django Channels для интеграции с существующим бэкендом и библиотеку websockets для отдельных микросервисов.
Самым сложным оказалось правильно организовать масштабирование. Пришлось использовать Redis в качестве брокера сообщений между разными экземплярами WebSocket-серверов. Но результат стоил усилий: время от публикации до модерации сократилось с минут до секунд, а нагрузка на серверы снизилась на 40%.
Практические кейсы использования WebSocket в Python-приложениях
WebSocket особенно эффективен в определенных сценариях, где критично передавать данные в реальном времени с минимальными задержками. Рассмотрим несколько практических кейсов и их реализацию.
Чат-приложение на основе Flask-SocketIO
Flask-SocketIO — это популярная библиотека, упрощающая интеграцию WebSocket с Flask-приложениями. Она предоставляет высокоуровневый API и дополнительные возможности, такие как "комнаты" для организации групповых коммуникаций.
from flask import Flask, render_template
from flask_socketio import SocketIO, emit, join_room, leave_room
app = Flask(__name__)
app.config['SECRET_KEY'] = 'секретный-ключ'
socketio = SocketIO(app, cors_allowed_origins="*")
# Словарь для хранения пользователей и их комнат
users = {}
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('connect')
def handle_connect():
print('Клиент подключился')
@socketio.on('disconnect')
def handle_disconnect():
if request.sid in users:
room = users[request.sid]["room"]
username = users[request.sid]["username"]
leave_room(room)
del users[request.sid]
emit('status', {'msg': f'{username} покинул чат'}, room=room)
@socketio.on('join')
def handle_join(data):
username = data['username']
room = data['room']
join_room(room)
users[request.sid] = {"username": username, "room": room}
emit('status', {'msg': f'{username} присоединился к чату'}, room=room)
@socketio.on('message')
def handle_message(data):
room = users[request.sid]["room"]
username = users[request.sid]["username"]
emit('message', {'msg': data['msg'], 'username': username}, room=room)
if __name__ == '__main__':
socketio.run(app, debug=True)
Соответствующий HTML/JavaScript-код для клиентской части:
<!-- index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Flask-SocketIO Чат</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script>
document.addEventListener('DOMContentLoaded', function() {
const socket = io();
// Присоединение к комнате
document.getElementById('join-form').addEventListener('submit', function(e) {
e.preventDefault();
const username = document.getElementById('username').value;
const room = document.getElementById('room').value;
socket.emit('join', {username: username, room: room});
document.getElementById('join-container').style.display = 'none';
document.getElementById('chat-container').style.display = 'block';
});
// Отправка сообщения
document.getElementById('message-form').addEventListener('submit', function(e) {
e.preventDefault();
const messageInput = document.getElementById('message');
socket.emit('message', {msg: messageInput.value});
messageInput.value = '';
});
// Получение сообщений
socket.on('message', function(data) {
addMessage(`<strong>${data.username}</strong>: ${data.msg}`);
});
// Получение статусных сообщений
socket.on('status', function(data) {
addMessage(`<i>${data.msg}</i>`);
});
function addMessage(html) {
const messageDiv = document.createElement('div');
messageDiv.innerHTML = html;
document.getElementById('messages').appendChild(messageDiv);
// Прокрутка вниз
document.getElementById('messages').scrollTop = document.getElementById('messages').scrollHeight;
}
});
</script>
</head>
<body>
<div id="join-container">
<h2>Присоединиться к чату</h2>
<form id="join-form">
<input id="username" placeholder="Ваше имя" required>
<input id="room" placeholder="Название комнаты" required>
<button type="submit">Присоединиться</button>
</form>
</div>
<div id="chat-container" style="display: none;">
<h2>Чат</h2>
<div id="messages" style="height: 300px; overflow-y: scroll; border: 1px solid #ccc;"></div>
<form id="message-form">
<input id="message" placeholder="Введите сообщение" autocomplete="off" required>
<button type="submit">Отправить</button>
</form>
</div>
</body>
</html>
Второй практический кейс — это система мониторинга ресурсов сервера в реальном времени.
# server.py
import asyncio
import json
import psutil
import websockets
import datetime
# Хранилище активных клиентских соединений
connected = set()
async def send_system_stats():
"""Отправляет статистику всем подключенным клиентам"""
while True:
if connected: # Проверяем, есть ли подключенные клиенты
# Собираем статистику
stats = {
'timestamp': datetime.datetime.now().strftime('%H:%M:%S'),
'cpu_percent': psutil.cpu_percent(interval=None),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'network': {
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_recv': psutil.net_io_counters().bytes_recv
}
}
# Преобразуем в JSON
message = json.dumps(stats)
# Отправляем всем клиентам
websockets_to_remove = set()
for websocket in connected:
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
# Отмечаем закрытые соединения для последующего удаления
websockets_to_remove.add(websocket)
# Удаляем закрытые соединения
connected -= websockets_to_remove
# Ждем 1 секунду перед следующим обновлением
await asyncio.sleep(1)
async def handler(websocket, path):
"""Обработчик подключений"""
# Добавляем новое соединение в множество
connected.add(websocket)
print(f"Новое подключение: {len(connected)} активных клиентов")
try:
# Отправляем приветственное сообщение
await websocket.send(json.dumps({
'type': 'init',
'message': 'Соединение установлено. Начинаем передачу данных.'
}))
# Просто держим соединение открытым
await websocket.wait_closed()
finally:
# Удаляем соединение при отключении
connected.remove(websocket)
print(f"Клиент отключился: {len(connected)} активных клиентов")
async def main():
# Запускаем сервер
server = await websockets.serve(handler, "0.0.0.0", 8765)
print("Сервер мониторинга запущен на ws://0.0.0.0:8765")
# Запускаем задачу отправки статистики
asyncio.create_task(send_system_stats())
# Держим сервер запущенным
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
Сравнение традиционных подходов с WebSocket для различных сценариев:
| Сценарий | Традиционный подход | WebSocket подход | Преимущества WebSocket |
|---|---|---|---|
| Чат | Long Polling: клиент периодически опрашивает сервер | Постоянное двунаправленное соединение | Меньше задержки, меньше нагрузка на сервер |
| Мониторинг | Периодические AJAX-запросы | Сервер отправляет данные при их изменении | Актуальность данных, меньший трафик |
| Онлайн-игры | Регулярные запросы состояния игры | Мгновенная синхронизация состояния | Лучший игровой опыт, низкая задержка |
| Уведомления | Периодическая проверка или отправка через Push API | Немедленная доставка через WebSocket | Более надежная доставка, кроссплатформенность |
Продвинутые техники работы с WebSocket библиотеками Python
Для создания производительных и масштабируемых WebSocket приложений необходимо освоить несколько продвинутых техник. 🧠
1. Использование асинхронного контекстного менеджера для обработки ошибок
class WebSocketConnection:
def __init__(self, uri):
self.uri = uri
self.websocket = None
async def __aenter__(self):
try:
self.websocket = await websockets.connect(self.uri)
return self.websocket
except Exception as e:
print(f"Ошибка подключения: {e}")
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.websocket:
await self.websocket.close()
if exc_type is not None:
print(f"Произошла ошибка: {exc_val}")
# Возвращаем False, чтобы исключение было поднято дальше
return False
2. Обработка потери соединения и автоматическое переподключение
async def connect_with_retry(uri, max_retries=5, backoff_factor=1.5):
"""Подключается к WebSocket с автоматическими повторными попытками"""
retry_count = 0
delay = 1 # начальная задержка в секундах
while retry_count < max_retries:
try:
return await websockets.connect(uri)
except (websockets.exceptions.ConnectionClosed,
websockets.exceptions.InvalidStatusCode,
ConnectionRefusedError) as e:
retry_count += 1
if retry_count >= max_retries:
raise Exception(f"Не удалось подключиться после {max_retries} попыток: {e}")
print(f"Попытка {retry_count}/{max_retries} не удалась: {e}")
print(f"Повторная попытка через {delay} секунд...")
await asyncio.sleep(delay)
delay *= backoff_factor # увеличиваем задержку с каждой попыткой
3. Использование heartbeat для проверки активности соединения
async def heartbeat(websocket, interval=30):
"""Отправляет ping-сообщения для поддержания соединения"""
try:
while True:
await asyncio.sleep(interval)
try:
pong = await websocket.ping()
await asyncio.wait_for(pong, timeout=10)
print("Получен pong от сервера")
except asyncio.TimeoutError:
raise websockets.exceptions.ConnectionClosed(
1011, "Сервер не ответил на ping"
)
except websockets.exceptions.ConnectionClosed:
print("Соединение закрыто во время heartbeat")
# Здесь можно добавить логику переподключения
4. Оптимизация для высоконагруженных систем
При работе с большим количеством одновременных соединений важно оптимизировать использование ресурсов:
- Ограничение количества одновременных подключений с использованием семафоров
- Буферизация сообщений перед отправкой для снижения накладных расходов
- Использование сжатия данных для уменьшения объема передаваемой информации
import asyncio
import zlib
import json
# Ограничиваем количество одновременных соединений
connection_semaphore = asyncio.Semaphore(100)
# Буфер для сообщений
message_buffer = {}
BUFFER_SIZE = 10
async def send_with_compression(websocket, message):
"""Отправляет сжатые данные через WebSocket"""
json_data = json.dumps(message).encode('utf-8')
compressed_data = zlib.compress(json_data)
await websocket.send(compressed_data)
async def buffer_message(websocket, message):
"""Буферизует сообщения и отправляет их пакетами"""
client_id = id(websocket)
if client_id not in message_buffer:
message_buffer[client_id] = []
message_buffer[client_id].append(message)
# Если буфер полон, отправляем пакет сообщений
if len(message_buffer[client_id]) >= BUFFER_SIZE:
await send_messages_batch(websocket, client_id)
async def send_messages_batch(websocket, client_id):
"""Отправляет накопленные сообщения одним пакетом"""
if client_id in message_buffer and message_buffer[client_id]:
messages = message_buffer[client_id]
await send_with_compression(websocket, {"type": "batch", "messages": messages})
message_buffer[client_id] = []
5. Интеграция с распределенными системами
Для масштабирования WebSocket-серверов на несколько экземпляров требуется система обмена сообщениями между ними:
import asyncio
import websockets
import redis.asyncio as redis
import json
# Подключение к Redis
redis_client = None
async def setup_redis():
global redis_client
redis_client = await redis.Redis(host='localhost', port=6379, db=0)
# Подписка на канал для получения сообщений от других экземпляров сервера
pubsub = redis_client.pubsub()
await pubsub.subscribe("websocket_broadcast")
# Задача для прослушивания сообщений
asyncio.create_task(listen_redis_messages(pubsub))
async def listen_redis_messages(pubsub):
"""Слушает сообщения от Redis и рассылает их подключенным клиентам"""
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
data = json.loads(message['data'].decode('utf-8'))
await broadcast_to_clients(data)
async def broadcast_to_clients(message, exclude_websocket=None):
"""Отправляет сообщение всем подключенным клиентам"""
# Логика отправки сообщений клиентам
pass
async def broadcast_via_redis(message):
"""Отправляет сообщение через Redis для всех экземпляров сервера"""
if redis_client:
await redis_client.publish("websocket_broadcast", json.dumps(message))
6. Профилирование и оптимизация производительности
Для мониторинга производительности WebSocket-серверов можно использовать специальные декораторы:
import time
import functools
import asyncio
def async_timeit(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} выполнилась за {end_time – start_time:.4f} секунд")
return result
return async_wrapper
@async_timeit
async def process_message(message):
"""Обрабатывает сообщение с измерением времени выполнения"""
await asyncio.sleep(0.1) # Имитация работы
return {"status": "processed"}
Использование этих продвинутых техник позволяет создавать надежные, масштабируемые и эффективные WebSocket-приложения на Python, способные обслуживать тысячи одновременных подключений.
Освоив WebSocket в Python, вы получили мощный инструмент для создания приложений реального времени. Протокол открывает возможности, недоступные традиционному HTTP, и Python с его асинхронными возможностями делает работу с WebSocket интуитивно понятной и производительной. От простых чатов до сложных систем мониторинга — теперь вы можете реализовать двунаправленную связь с минимальными задержками. Помните: правильное использование продвинутых техник — обработка ошибок, масштабирование, оптимизация — ключ к созданию действительно надёжных решений на базе WebSocket.