Сообщить об ошибке.

Сигналы Unix в циклах событий asyncio в Python

В разделе рассмотрены низкоуровневые методы модуля asyncio, при помощи которых можно устанавливать и распространять сигналы Unix в цикла событий основного потока.

Прежде чем что-то делать с циклом событий, его необходимо создать или получить функциями, описанными в разделе "Создание, запуск и получение цикла событий".

Смотрите так-же модуль signal.

Содержание:


loop.add_signal_handler(signum, callback, *args):

Метод loop.add_signal_handler() устанавливает обратный вызов callback в качестве обработчика сигнала signum.

Обратный вызов callback будет вызываться циклом событий вместе с другими обратными вызовами в очереди и запускаемыми сопрограммами этого цикла событий.

В отличие от обработчиков сигналов, зарегистрированных с помощью signal.signal(), обратный вызов, зарегистрированный с помощью этой функции, может взаимодействовать с циклом событий.

Для передачи аргументов ключевого слова в обратный вызов используйте вызов functools.partial().

Как и signal.signal(), Метод loop.add_signal_handler() должен вызываться в основном потоке.

loop.remove_signal_handler(sig):

Метод loop.remove_signal_handler() удаляет обработчик сигнала sig.

Возвращает True, если обработчик сигнала был удален, или False, если обработчик не был установлен для сигнала sig.

Доступность: Unix.

Пример установки обработчиков сигналов SIGINT и SIGTERM.

В примере регистрируются обработчики сигналов signal.SIGINT и signal.SIGTERM с помощью метода loop.add_signal_handler().

Этот пример работает только в Unix.

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print(f'Получен сигнал {signame}: выход')
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))
    
    # цикл событий работает в течение 1 часа
    await asyncio.sleep(3600)

print('Нажмите Ctrl+C, чтобы прервать цикл событий.')
print(f'PID {os.getpid()}: послан сигнал `SIGINT` или `SIGTERM`.')

asyncio.run(main())