Обработка SIGINT в Python: как защитить программу от Ctrl+C
Для кого эта статья:
- Python-разработчики, работающие с системным программированием
- Специалисты, занимающиеся разработкой серверных и консольных приложений
Инженеры, стремящиеся улучшить надежность и отказоустойчивость своих приложений
Нажали Ctrl+C в терминале, и ваша программа "упала", не завершив критические операции? Если ваш Python-скрипт обрабатывает данные, управляет ресурсами или выполняет длительные вычисления — внезапное прерывание может привести к потере информации или оставить систему в неопределенном состоянии. Правильная обработка SIGINT — это не просто приятный бонус, а необходимый элемент любого серьезного приложения. Давайте разберемся, как защитить ваши программы от хаоса клавиатурных прерываний! 🛡️
Если вы заинтересованы в углублении своих навыков работы с системными аспектами Python, включая профессиональную обработку сигналов, обратите внимание на курс Обучение Python-разработке от Skypro. Программа включает не только базовые концепции, но и продвинутые техники системного программирования, которые превратят вас из обычного кодера в инженера, создающего отказоустойчивые приложения промышленного уровня.
Что такое SIGINT: механизмы обработки сигналов в Python
SIGINT (Signal Interrupt) — это один из стандартных сигналов POSIX, который операционная система отправляет процессу при нажатии пользователем комбинации клавиш Ctrl+C в терминале. По умолчанию этот сигнал завершает работу программы, но Python позволяет перехватывать его и выполнять собственные действия перед завершением.
Сигналы — это программные прерывания, отправляемые процессу операционной системой или другими процессами. Они предоставляют механизм для асинхронной обработки событий, таких как запросы на завершение программы, ошибки или другие исключительные ситуации.
Алексей, системный архитектор
Несколько лет назад мы разрабатывали систему мониторинга сетевого оборудования, которая должна была непрерывно собирать и анализировать данные. Первая версия приложения была написана без учёта корректной обработки сигналов, и каждый раз, когда оператор случайно нажимал Ctrl+C, система теряла буферизированные данные и оставляла незакрытыми сетевые соединения.
Это приводило к постепенной деградации работы оборудования из-за "утечки" TCP-соединений. Наше решение? Мы реализовали обработчик SIGINT, который гарантировал плавное завершение: сохранение собранных данных, отправку уведомлений и корректное закрытие всех соединений.
После этого изменения наша система стала намного устойчивее, а количество инцидентов сократилось на 78%. Я до сих пор помню, как удивился, что такое небольшое изменение в коде — всего несколько строк для обработки сигнала — могло иметь такое огромное влияние на стабильность всей системы.
В Python обработка сигналов реализована через стандартный модуль signal. Этот модуль позволяет:
- Регистрировать функции-обработчики для различных сигналов
- Временно блокировать получение сигналов
- Определять действия по умолчанию для сигналов
- Отправлять сигналы другим процессам
Важно понимать, что механизмы обработки сигналов имеют свои особенности и ограничения:
| Особенность | Описание | Последствия |
|---|---|---|
| Асинхронность | Сигналы могут приходить в любой момент выполнения программы | Обработчики должны быть потокобезопасными |
| Ограниченная функциональность | В обработчиках сигналов безопасно вызывать только ограниченный набор функций | Сложные операции лучше отложить до нормального выполнения программы |
| Зависимость от ОС | Не все сигналы доступны на всех платформах | Необходимо учитывать кросс-платформенность при работе с сигналами |
| Время обработки | Обработчики сигналов должны выполняться быстро | Долгие операции могут блокировать получение других сигналов |
Для понимания SIGINT важно также знать о других распространенных сигналах:
- SIGTERM (15) — запрос на "вежливое" завершение программы
- SIGKILL (9) — принудительное завершение (не может быть перехвачено)
- SIGHUP (1) — сигнал обрыва соединения с терминалом
- SIGUSR1 и SIGUSR2 (10, 12) — пользовательские сигналы для произвольных действий
В контексте Python SIGINT имеет номер 2 и по умолчанию вызывает исключение KeyboardInterrupt, что приводит к завершению программы, если это исключение не обрабатывается.

Базовый перехват сигнала SIGINT с модулем signal
Перехват SIGINT в Python — это базовый навык, который должен освоить каждый разработчик серверных или консольных приложений. Модуль signal предоставляет простой и элегантный способ регистрации пользовательских обработчиков сигналов. Рассмотрим базовый пример:
Вот простейший способ перехвата SIGINT:
import signal
import sys
import time
def signal_handler(sig, frame):
print('\nВы нажали Ctrl+C! Завершаю программу...')
sys.exit(0)
# Регистрируем обработчик сигнала
signal.signal(signal.SIGINT, signal_handler)
print('Нажмите Ctrl+C для выхода')
# Бесконечный цикл для демонстрации
while True:
time.sleep(1)
В этом примере функция signal_handler вызывается каждый раз при получении сигнала SIGINT. Параметр sig содержит номер сигнала, а frame — текущий стековый фрейм.
При использовании модуля signal важно помнить несколько ключевых аспектов:
- Обработчик сигнала может быть вызван в любой точке выполнения программы
- Некоторые операции не являются атомарными и могут быть прерваны сигналом
- Обработчики должны быть максимально простыми и быстрыми
- Вложенные обработчики сигналов могут привести к сложно отлаживаемым проблемам
Часто бывает полезно временно блокировать обработку сигналов во время критических операций. Для этого можно использовать контекстный менеджер signal.pthread_sigmask:
import signal
import time
def critical_operation():
# Блокируем SIGINT на время критической операции
mask = signal.SIG_BLOCK
signals = [signal.SIGINT]
with signal.pthread_sigmask(mask, signals):
print("Начало критической операции (SIGINT заблокирован)")
time.sleep(3) # Имитация длительной операции
print("Критическая операция завершена")
# Основная логика программы
try:
print("Программа запущена, нажмите Ctrl+C во время критической операции")
time.sleep(2)
critical_operation()
print("Программа продолжает работу")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nПрограмма завершена пользователем")
Другой распространенный подход — использование флагов для управления состоянием программы:
import signal
import time
running = True
def signal_handler(sig, frame):
global running
print('\nПолучен сигнал на завершение, готовлюсь к выходу...')
running = False
signal.signal(signal.SIGINT, signal_handler)
print("Программа запущена. Нажмите Ctrl+C для корректного завершения")
# Основной цикл программы
while running:
# Выполнение задач программы
print("Работаю...")
time.sleep(1)
print("Программа корректно завершена")
Этот подход особенно полезен, когда вам нужно выполнить дополнительные действия перед завершением программы, например, закрыть файлы или сетевые соединения.
| Метод перехвата | Преимущества | Недостатки | Рекомендуемое применение |
|---|---|---|---|
Прямой обработчик с sys.exit() | Простота реализации, мгновенный выход | Нет возможности корректно завершить операции | Простые скрипты без сложной логики завершения |
| Обработчик с флагом состояния | Контролируемое завершение, корректная очистка ресурсов | Требует проверки состояния в основном цикле | Сложные приложения с ресурсами, требующими освобождения |
Блокировка сигналов с pthread_sigmask | Защита критических участков кода | Повышенная сложность, риск пропустить важные сигналы | Участки кода с атомарными операциями |
Перехват через try/except KeyboardInterrupt | Интеграция с системой исключений Python | Ограниченная гибкость по сравнению с signal | Скрипты с несложной логикой обработки прерываний |
Продвинутые техники обработки Ctrl+C в Python-приложениях
Когда базовых методов перехвата SIGINT недостаточно, приходит время для более сложных подходов. Продвинутые техники особенно полезны в сложных системах, серверных приложениях и критичных процессах обработки данных. 🚀
Рассмотрим несколько продвинутых методов обработки сигнала SIGINT:
1. Отложенная обработка с использованием очередей
import signal
import queue
import threading
import time
# Очередь для коммуникации между обработчиком сигнала и основным потоком
signal_queue = queue.Queue()
def signal_handler(sig, frame):
print("\nСигнал получен, планируем завершение...")
# Помещаем сигнал в очередь вместо немедленной обработки
signal_queue.put(sig)
# Регистрируем обработчик
signal.signal(signal.SIGINT, signal_handler)
def main_process():
try:
while True:
print("Выполняем основную работу...")
# Проверяем, есть ли сигнал в очереди (неблокирующий способ)
try:
sig = signal_queue.get_nowait()
print(f"Обрабатываем сигнал {sig} в основном потоке")
print("Выполняем корректное завершение...")
break
except queue.Empty:
# Если сигналов нет, продолжаем работу
pass
# Имитация полезной работы
time.sleep(1)
print("Программа завершена корректно")
except Exception as e:
print(f"Произошла ошибка: {e}")
main_process()
Этот подход позволяет отделить получение сигнала от его обработки, что особенно полезно в многопоточных приложениях.
2. Многоуровневая обработка сигналов
В некоторых случаях полезно реализовать разное поведение в зависимости от того, сколько раз пользователь нажал Ctrl+C:
import signal
import time
import sys
interrupt_count = 0
max_interrupts = 3
def signal_handler(sig, frame):
global interrupt_count
interrupt_count += 1
if interrupt_count == 1:
print("\nНажмите Ctrl+C ещё раз для начала корректного завершения")
elif interrupt_count == 2:
print("\nИдёт корректное завершение... Нажмите Ctrl+C ещё раз для принудительного выхода")
# Здесь можно запустить процедуру корректного завершения
elif interrupt_count >= max_interrupts:
print("\nПринудительное завершение!")
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
try:
print("Программа запущена. Нажмите Ctrl+C для взаимодействия")
while True:
time.sleep(0.5)
# Имитация сброса счетчика прерываний при длительном отсутствии Ctrl+C
if time.time() % 10 < 0.5 and interrupt_count > 0:
print("Сброс счетчика прерываний")
interrupt_count = 0
except Exception as e:
print(f"Произошла ошибка: {e}")
Дмитрий, DevOps-инженер
В моей практике был случай с системой резервного копирования данных, которая ежедневно обрабатывала терабайты информации. Однажды мы столкнулись с проблемой: оператор случайно прервал процесс копирования нажатием Ctrl+C, что привело к потере целостности архива и серьезным проблемам с восстановлением.
Мы решили внедрить многоуровневую обработку SIGINT. Теперь при первом нажатии Ctrl+C система сообщала: "Обнаружена попытка прерывания. Нажмите Ctrl+C ещё раз для подтверждения или подождите 5 секунд для отмены." При втором нажатии система инициировала "мягкое" завершение — завершала текущую операцию, закрывала файлы, сохраняла контрольные точки.
Результат превзошел ожидания: количество инцидентов с поврежденными архивами сократилось до нуля, а операторы оценили "защиту от случайного нажатия". Позже мы добавили третий уровень — для экстренного прерывания, если даже мягкое завершение занимало слишком много времени.
3. Обработка SIGINT с таймаутом на корректное завершение
Иногда важно ограничить время на корректное завершение, чтобы программа не "зависла" в процессе очистки:
import signal
import threading
import time
import sys
# Флаг для контроля завершения программы
shutting_down = False
# Флаг успешного завершения очистки
cleanup_completed = False
def cleanup_handler():
"""Симуляция длительного процесса очистки ресурсов"""
global cleanup_completed
print("Начинаю очистку ресурсов...")
# Имитация длительной операции
for i in range(5):
if shutting_down: # Проверка флага для ранней остановки
time.sleep(0.5)
print(f"Очистка: шаг {i+1}/5 завершен")
cleanup_completed = True
print("Очистка ресурсов завершена")
def force_exit():
"""Принудительное завершение, если очистка заняла слишком много времени"""
time.sleep(3) # Ждем 3 секунды
if not cleanup_completed:
print("\nТаймаут очистки ресурсов! Принудительное завершение.")
sys.exit(1)
def signal_handler(sig, frame):
global shutting_down
if shutting_down:
print("\nПолучен повторный сигнал прерывания. Принудительный выход.")
sys.exit(1)
print("\nПолучен сигнал прерывания. Начинаю корректное завершение.")
shutting_down = True
# Запускаем очистку ресурсов в отдельном потоке
cleanup_thread = threading.Thread(target=cleanup_handler)
cleanup_thread.start()
# Запускаем таймер для принудительного выхода
force_exit_thread = threading.Thread(target=force_exit)
force_exit_thread.daemon = True # Делаем поток демоном, чтобы он не блокировал выход
force_exit_thread.start()
# Ждем завершения очистки
cleanup_thread.join()
if cleanup_completed:
print("Программа успешно завершена")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
print("Программа запущена. Нажмите Ctrl+C для завершения")
while not shutting_down:
# Основной цикл программы
time.sleep(0.5)
except Exception as e:
print(f"Произошла ошибка: {e}")
sys.exit(1)
4. Использование контекстных менеджеров для защиты ресурсов
Контекстные менеджеры — это элегантный способ защитить ресурсы от неожиданного завершения программы:
import signal
import sys
import time
import contextlib
class SignalHandler:
def __init__(self):
self.interrupted = False
self.released = False
self.original_handler = None
def __enter__(self):
self.original_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self.handler)
return self
def handler(self, sig, frame):
self.release()
self.interrupted = True
print("\nПрограмма получила сигнал на завершение")
def __exit__(self, type, value, traceback):
self.release()
def release(self):
if not self.released:
signal.signal(signal.SIGINT, self.original_handler)
self.released = True
@contextlib.contextmanager
def critical_section():
"""Защищенная от прерываний секция кода"""
original_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
yield
finally:
signal.signal(signal.SIGINT, original_handler)
# Пример использования
try:
with SignalHandler() as h:
print("Программа запущена. Нажмите Ctrl+C для проверки обработки сигнала")
# Основной цикл
while not h.interrupted:
print("Выполняется обычная работа...")
time.sleep(1)
# Критическая секция, защищенная от прерываний
with critical_section():
print("Выполняется критическая операция (Ctrl+C заблокирован)...")
time.sleep(3)
print("Критическая операция завершена")
print("Выполняем контролируемое завершение...")
time.sleep(2)
print("Завершение выполнено успешно")
except Exception as e:
print(f"Произошла ошибка: {e}")
Эти продвинутые техники значительно улучшают устойчивость приложений к прерываниям, обеспечивая защиту данных и ресурсов даже в непредвиденных ситуациях.
Реализация корректного завершения программы при SIGINT
Корректное завершение программы при получении сигнала SIGINT — это не просто "вежливость" по отношению к пользователю, а критически важный аспект надежного ПО. Рассмотрим, как правильно организовать процесс завершения, чтобы избежать потери данных и утечки ресурсов. ⚡️
При реализации корректного завершения нужно учитывать следующие ключевые моменты:
- Сохранение незавершенных данных и состояния программы
- Закрытие открытых файлов и сетевых соединений
- Освобождение блокировок и других системных ресурсов
- Уведомление пользователя о процессе завершения
- Корректное завершение дочерних процессов или потоков
Рассмотрим комплексный пример корректного завершения программы с обработкой файлов:
import signal
import sys
import time
import threading
import os
import atexit
class Application:
def __init__(self):
self.shutdown_event = threading.Event()
self.data_changed = False
self.data = []
self.data_file = "app_data.txt"
self.log_file = None
self.worker_threads = []
# Загрузка данных при запуске
self.load_data()
# Регистрируем обработчики завершения
signal.signal(signal.SIGINT, self.signal_handler)
atexit.register(self.cleanup)
def load_data(self):
"""Загрузка данных из файла"""
try:
if os.path.exists(self.data_file):
with open(self.data_file, 'r') as f:
self.data = [line.strip() for line in f.readlines()]
print(f"Загружено {len(self.data)} записей данных")
except Exception as e:
print(f"Ошибка при загрузке данных: {e}")
def save_data(self):
"""Сохранение данных в файл"""
if not self.data_changed:
print("Данные не изменялись, сохранение не требуется")
return
try:
with open(self.data_file, 'w') as f:
for item in self.data:
f.write(f"{item}\n")
print(f"Данные успешно сохранены в {self.data_file}")
self.data_changed = False
except Exception as e:
print(f"Ошибка при сохранении данных: {e}")
def start_logging(self):
"""Открытие лог-файла"""
try:
self.log_file = open("app.log", "a")
self.log("Приложение запущено")
except Exception as e:
print(f"Не удалось открыть лог-файл: {e}")
def log(self, message):
"""Запись в лог"""
if self.log_file and not self.log_file.closed:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_file.write(f"[{timestamp}] {message}\n")
self.log_file.flush()
def start_workers(self):
"""Запуск рабочих потоков"""
for i in range(3):
thread = threading.Thread(
target=self.worker_process,
args=(i,),
daemon=True
)
self.worker_threads.append(thread)
thread.start()
print(f"Запущен рабочий поток {i}")
def worker_process(self, worker_id):
"""Имитация рабочего процесса"""
while not self.shutdown_event.is_set():
# Выполняем какую-то работу
new_data = f"Данные от потока {worker_id}: {time.time()}"
self.data.append(new_data)
self.data_changed = True
self.log(f"Поток {worker_id} добавил данные")
# Имитация работы
for _ in range(10):
if self.shutdown_event.is_set():
break
time.sleep(0.5)
def signal_handler(self, sig, frame):
"""Обработчик сигнала SIGINT"""
print("\nПолучен сигнал на завершение. Начинаю корректное завершение...")
self.shutdown()
def shutdown(self):
"""Запуск процедуры корректного завершения"""
if self.shutdown_event.is_set():
print("Завершение уже выполняется")
return
# Уведомляем потоки о необходимости завершения
self.shutdown_event.set()
self.log("Начато корректное завершение приложения")
# Ждем завершения всех рабочих потоков
print("Ожидание завершения рабочих потоков...")
for i, thread in enumerate(self.worker_threads):
print(f"Ожидание завершения потока {i}...")
thread.join(timeout=2)
if thread.is_alive():
print(f"Поток {i} не завершился вовремя")
# Завершаем работу приложения
self.cleanup()
print("Приложение успешно завершено")
sys.exit(0)
def cleanup(self):
"""Очистка ресурсов перед выходом"""
# Сохраняем данные
print("Сохранение данных...")
self.save_data()
# Закрываем лог-файл
if self.log_file and not self.log_file.closed:
self.log("Приложение завершено")
print("Закрытие лог-файла...")
self.log_file.close()
def run(self):
"""Основной метод запуска приложения"""
self.start_logging()
self.start_workers()
try:
print("Приложение работает. Нажмите Ctrl+C для завершения")
while not self.shutdown_event.is_set():
time.sleep(1)
except Exception as e:
print(f"Произошла ошибка: {e}")
self.shutdown()
# Запуск приложения
if __name__ == "__main__":
app = Application()
app.run()
Этот пример демонстрирует несколько важных аспектов корректного завершения:
- Использование объекта
threading.Eventдля синхронизации завершения потоков - Сохранение изменённых данных перед выходом
- Правильное закрытие файловых дескрипторов
- Ожидание завершения потоков с таймаутом
- Использование
atexitкак дополнительной гарантии выполнения очистки
При разработке систем с корректной обработкой завершения важно учитывать типичные сценарии использования и возможные проблемы:
| Сценарий | Возможные проблемы | Рекомендуемые решения |
|---|---|---|
| Работа с базами данных | Незавершённые транзакции, потеря соединений | Использование контекстных менеджеров, явное закрытие соединений |
| Обработка файлов | Потеря данных, повреждённые файлы | Регулярное сохранение, контрольные точки, flush() перед выходом |
| Сетевые приложения | Зависшие соединения, незавершённые передачи | Корректное закрытие сокетов, уведомление клиентов о завершении |
| Многопоточные приложения | Потоки-зомби, дедлоки при завершении | События завершения, таймауты при ожидании потоков |
| GUI-приложения | Зависание интерфейса при завершении | Индикаторы прогресса, отделение интерфейса от процесса завершения |
Дополнительные советы для реализации корректного завершения:
- Используйте шаблон проектирования "Наблюдатель" для уведомления компонентов о завершении
- Реализуйте механизм "отложенной очистки" для особо критичных ресурсов
- Всегда тестируйте сценарии аварийного завершения программы
- Логируйте процесс завершения для упрощения отладки проблем
- Рассмотрите возможность реализации автоматического восстановления после некорректного завершения
Правильная реализация завершения программы при получении SIGINT делает ваши приложения намного более надежными и устойчивыми к различным сбоям и прерываниям.
Обработка сигналов в многопоточных Python-программах
Обработка сигналов в многопоточных приложениях представляет особую сложность из-за особенностей реализации потоков и сигналов в Python. В отличие от однопоточных программ, где все достаточно прямолинейно, многопоточные приложения требуют специальных подходов для корректной обработки SIGINT. 🧵
Основная сложность заключается в следующем: в Python сигналы доставляются только основному потоку. Это означает, что если ваш обработчик сигнала зарегистрирован в главном потоке, а большая часть работы выполняется в дочерних потоках, возникает проблема синхронизации между ними.
Рассмотрим основные стратегии обработки SIGINT в многопоточных приложениях:
1. Использование событий для синхронизации между потоками
import signal
import threading
import time
# Создаем событие для сигнализации о завершении
shutdown_event = threading.Event()
def signal_handler(sig, frame):
print("\nПолучен SIGINT, уведомляем все потоки о необходимости завершения...")
shutdown_event.set() # Устанавливаем флаг события
def worker(worker_id):
try:
while not shutdown_event.is_set():
print(f"Рабочий поток {worker_id} выполняет задачу...")
# Проверяем событие каждую секунду
# Это позволяет потоку реагировать на сигнал завершения
shutdown_event.wait(timeout=1)
print(f"Рабочий поток {worker_id} получил сигнал завершения и корректно останавливается")
except Exception as e:
print(f"Ошибка в потоке {worker_id}: {e}")
# Регистрируем обработчик сигнала в главном потоке
signal.signal(signal.SIGINT, signal_handler)
# Запускаем несколько рабочих потоков
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
t.start()
threads.append(t)
# Ожидаем завершения всех потоков
try:
while any(t.is_alive() for t in threads):
time.sleep(0.1)
print("Все потоки завершены")
except Exception as e:
print(f"Ошибка в главном потоке: {e}")
2. Использование очереди для коммуникации между потоками
Альтернативный подход — использовать очередь для передачи сигналов от главного потока к рабочим:
import signal
import threading
import queue
import time
# Очередь для коммуникации между потоками
command_queue = queue.Queue()
def signal_handler(sig, frame):
print("\nПолучен SIGINT, отправляем команды завершения всем потокам")
# Отправляем команду завершения в очередь для каждого потока
for _ in range(len(threads)):
command_queue.put("SHUTDOWN")
def worker(worker_id):
try:
while True:
# Выполняем работу
print(f"Поток {worker_id} работает...")
# Проверяем наличие команд в очереди (неблокирующий способ)
try:
command = command_queue.get(block=False)
if command == "SHUTDOWN":
print(f"Поток {worker_id} получил команду завершения")
break
command_queue.task_done()
except queue.Empty:
# Если команд нет, продолжаем работу
pass
time.sleep(1)
print(f"Поток {worker_id} завершается корректно")
except Exception as e:
print(f"Ошибка в потоке {worker_id}: {e}")
# Регистрируем обработчик
signal.signal(signal.SIGINT, signal_handler)
# Создаем и запускаем потоки
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
t.daemon = True # Делаем поток демоном для автоматического завершения при выходе из программы
t.start()
threads.append(t)
# Главный цикл программы
try:
while any(t.is_alive() for t in threads):
time.sleep(0.1)
print("Все потоки завершены")
except Exception as e:
print(f"Ошибка в главном потоке: {e}")
3. Использование пула потоков с управляемым завершением
При использовании пула потоков (например, concurrent.futures.ThreadPoolExecutor) можно реализовать более элегантное решение:
import signal
import time
import concurrent.futures
import sys
# Флаг для отслеживания состояния завершения
is_shutting_down = False
def signal_handler(sig, frame):
global is_shutting_down
if is_shutting_down:
print("\nПринудительное завершение")
sys.exit(1)
print("\nПолучен SIGINT, начинаю корректное завершение...")
is_shutting_down = True
def task(task_id):
"""Задача, которая может быть прервана"""
for i in range(10):
# Проверяем глобальный флаг завершения
if is_shutting_down:
print(f"Задача {task_id} прервана на шаге {i}")
return f"Задача {task_id} завершена досрочно"
# Имитация работы
print(f"Задача {task_id}: шаг {i}")
time.sleep(1)
return f"Задача {task_id} завершена полностью"
# Регистрируем обработчик сигнала
signal.signal(signal.SIGINT, signal_handler)
# Используем пул потоков
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Запускаем задачи
future_to_task = {executor.submit(task, i): i for i in range(5)}
try:
# Получаем результаты по мере завершения задач
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result()
print(f"Результат: {result}")
except Exception as e:
print(f"Задача {task_id} вызвала исключение: {e}")
except KeyboardInterrupt:
# Этот блок может не выполниться, так как KeyboardInterrupt
# обрабатывается нашим обработчиком сигнала
pass
finally:
if is_shutting_down:
print("Отмена всех незавершенных задач...")
# Отменяем все незавершенные задачи
for future in future_to_task:
if not future.done():
future.cancel()
print("Ожидание завершения выполняющихся задач...")
# Даем немного времени для завершения текущих задач
executor.shutdown(wait=True, cancel_futures=True)
print("Программа завершена")
4. Особенности работы с сигналами в разных потоках
Если вам необходимо управлять сигналами в отдельных потоках, можно использовать signal.pthread_sigmask для блокировки сигналов в конкретных потоках:
import signal
import threading
import time
import os
def thread_with_blocked_signals():
# Блокируем SIGINT в этом потоке
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
print(f"Поток {threading.current_thread().name} заблокировал SIGINT")
# Этот поток не будет прерван сигналом SIGINT
try:
while True:
print(f"Поток с блокировкой сигналов работает...")
time.sleep(1)
except Exception as e:
print(f"Исключение в защищенном потоке: {e}")
def thread_with_signals():
print(f"Поток {threading.current_thread().name} получает сигналы")
# Этот поток не будет получать сигналы напрямую,
# но может проверять флаги, установленные главным потоком
try:
while not shutdown_requested:
print(f"Поток с сигналами работает...")
time.sleep(1)
print("Поток с сигналами завершается по запросу")
except Exception as e:
print(f"Исключение в незащищенном потоке: {e}")
def signal_handler(sig, frame):
global shutdown_requested
print(f"\nПолучен сигнал {sig} в потоке {threading.current_thread().name}")
shutdown_requested = True
print("Запрошено завершение")
# Глобальный флаг для сигнализации о завершении
shutdown_requested = False
# Регистрируем обработчик в главном потоке
signal.signal(signal.SIGINT, signal_handler)
# Запускаем потоки
t1 = threading.Thread(target=thread_with_blocked_signals, daemon=True)
t2 = threading.Thread(target=thread_with_signals, daemon=True)
t1.start()
t2.start()
print(f"Главный поток (PID={os.getpid()}) ждет. Нажмите Ctrl+C для проверки обработки сигналов")
try:
while not shutdown_requested:
time.sleep(0.1)
print("Главный поток получил запрос на завершение")
print("Ожидание завершения потоков (или принудительное завершение через 5 секунд)")
# Устанавливаем таймаут для завершения потоков
t_end = time.time() + 5
while time.time() < t_end and (t1.is_alive() or t2.is_alive()):
time.sleep(0.1)
print("Программа завершена")
except Exception as e:
print(f"Исключение в главном потоке: {e}")
Вот несколько важных рекомендаций для работы с сигналами в многопоточных программах:
- Всегда регистрируйте обработчики сигналов в главном потоке
- Используйте потокобезопасные примитивы синхронизации (Events, Queues, Locks)
- Избегайте сложной обработки в самом обработчике сигнала
- Устанавливайте таймауты при ожидании завершения потоков
- Используйте daemon-потоки для автоматического завершения при выходе из программы
- Тщательно тестируйте обработку сигналов в многопоточной среде
Правильная обработка SIGINT в многопоточных приложениях — это скорее искусство, чем наука. Понимание особенностей работы сигналов и потоков в Python поможет вам создавать более надежные и отказоустойчивые приложения.
Подводя итог, корректная обработка сигнала SIGINT является неотъемлемой частью разработки надежных Python-приложений. Научившись правильно реагировать на Ctrl+C, вы не только улучшите пользовательский опыт, но и обезопасите свои программы от потери данных и других негативных последствий внезапного прерывания. Используйте рассмотренные подходы — от базового перехвата с модулем signal до многоуровневых систем в многопоточных приложениях — чтобы ваши программы оставались надежными даже в непредвиденных ситуациях. Помните: хорошо написанный код должен быть готов не только к стандартным сценариям использования, но и к исключительным ситуациям.