Сообщить об ошибке.

Планировщик сообщений модуля python-telegram-bot

Периодическая отправка сообщений, с заданным интервалом

Класс расширения telegram.ext.JobQueue позволяет выполнять задачи с задержкой или даже периодически, с заданным интервалом. Помимо прочего, его можно использовать для отправки регулярных сообщений своим подписчикам Telegram канала.

Внимание! Пакеты python-telegram-bot версии 13.x будут придерживаться многопоточной парадигмы программирования (*на данный момент актуальна версия 13.15). Пакеты версий 20.x и новее предоставляют чистый асинхронный Python интерфейс для Telegram Bot API. Дополнительно смотрите основные изменения в пакете python-telegram-bot версии 20.x.

Что необходимо учитывать при использовании планировщика JobQueue:

  • Класс JobQueue обеспечивает простой и готовый к использованию способ планирования задач в соответствии с архитектурой библиотеки python-telegram-bot.
  • Логика управления расписанием не является основной задачей python-telegram-bot, поэтому с версии 13 используется сторонняя библиотека.
  • Если необходимы настраиваемые штуки для планирования, то можно использовать расширенные функции стороннего модуля APScheduler.
  • Разработчики python-telegram-bot не гарантируют, что серверная часть останется неизменной. Например, если поддержка стороннего модуля APScheduler будет прекращена, то придется искать альтернативы.

Содержание:

Использование планировщика JobQueue.

Класс JobQueue тесно интегрирован с другими классами расширений telegram.ext.

Чтобы использовать JobQueue, не нужно много делать. Когда создается экземпляр средства обновления Updater, он автоматически создаст планировщик JobQueue, подобно диспетчеру Dispatcher:

from telegram.ext import Updater

upd = Updater('TOKEN', use_context=True)
dp = upd.dispatcher
jq = upd.job_queue
...

Для многопоточной версии 13.x, созданная очередь заданий jq уже связана с диспетчером. По этому, если нет веской причины, то не следует создавать экземпляр класса JobQueue самостоятельно.

Для асинхронной версии python-telegram-bot версии 20.x, планировщик JobQueue автоматически создается, при создании приложения Telegram:

from telegram.ext import Application

application = Application.builder().token('TOKEN').build()
job_queue = application.job_queue

В асинхронной версии 20.x созданная очередь jq также уже связана, но с приложением с application.

Задачи, в очереди заданий, инкапсулируются классом Job. Он принимает в качестве параметра функцию обратного вызова, которая будет выполнена, когда придет время. Функция обратного вызова всегда принимает один параметр: context возвращаемый telegram.ext.CallbackContext. Как и в случае функций, которые передаются обработчикам сообщений, через объект context можно получить доступ к context.bot - экземпляру telegram.Bot. В этом конкретном случае, получить доступ к context.job, который является экземпляром задачи Job, которая инициировала обратный вызов (подробнее об этом позже).

Можно использовать следующие методы для создания заданий с разной частотой и временем:

  • job_queue.run_once,
  • job_queue.run_repeating,
  • job_queue.run_daily
  • job_queue.run_monthly.

Обратите внимание еще раз, что не нужно создавать экземпляр класса Job напрямую.

Внимание! В коде, который приводится ниже необходимо заменить канал @examplechannel - каналом, администратором которого является ваш бот, или вашим идентификатором пользователя. Для того, чтобы узнать свой идентификатор пользователя, можно использовать @userinfobot.

Добавление в очередь и планирование задания (многопоточная версия 13.x).

Добавим первое задание в очередь, для этого определим функцию обратного вызова и добавим ее в очередь заданий.

def callback_minute(context: telegram.ext.CallbackContext):
    context.bot.send_message(chat_id='@examplechannel', 
                             text='One message every minute')

job_minute = jq.run_repeating(callback_minute, interval=60, first=10)

Функция callback_minute будет выполняться каждые 60,0 секунд, первый раз через 10 секунд (аргумент first=10). Аргументы interval и first указываются в секундах (могут иметь тип int, float или datetime.timedelta()). А аргумент first кроме того может принимать значения datetime.date() и datetime.time().

Возвращаемое значение функций обратного вызова - это создаваемые объекты Job. Не нужно сохранять результат run_repeating (который является новым экземпляром задания), воспользуемся им позже.

Также можно добавить задание, которое будет выполняться только один раз, с задержкой:

def callback_30(context: telegram.ext.CallbackContext):
    context.bot.send_message(chat_id='@examplechannel', 
                             text='A single message with 30s delay')

# Через тридцать секунд будет получено 
# сообщение от функции `callback_30`.
jq.run_once(callback_30, 30)

Для того, что бы временно отключить задание или даже полностью удалить его из очереди, сделайте следующее:

# Временно отключает задание `job_minute`
job_minute.enabled = False  
# Полностью удаляет работу `job_minute`
job_minute.schedule_removal()

Примечание: метод .schedule_removal() не удаляет задание сразу из очереди. Задание помечается для удаления и будет удалено, как только истечет его текущий интервал (задание не будет снова запускаться после того, как будет помечено для удаления).

Иногда необходимо сделать "напоминалку", т.е. добавить задания в ответ на определенный пользовательский ввод, и есть удобный способ сделать это. Аргумент контекста обратных вызовов Handler имеет JobQueue, прикрепленный как context.job_queue, готовый к использованию. Еще одна возможность, которую можно использовать - это ключевой аргумент context для задания Job. То есть можно передать любой объект в качестве параметра контекста при запуске задания и получить его на более позднем этапе, пока задание существует.

Смотрим, как это выглядит в коде:

from telegram.ext import CommandHandler

def callback_alarm(context):
    context.bot.send_message(chat_id=context.job.context, text='BEEP')

def callback_timer(update, context):
    context.bot.send_message(chat_id=update.message.chat_id,
                             text='Setting a timer for 1 minute!')

    context.job_queue.run_once(callback_alarm, 60, context=update.message.chat_id)

# сработает при команде `/timer`
timer_handler = CommandHandler('timer', callback_timer)
dp.add_handler(timer_handler)

Помещая chat_id в объект Job, функция обратного вызова знает, куда она должна отправить сообщение.

Если остановить Updater методом upd.stop(), то соответствующая очередь заданий также будет остановлена. Очередь заданий может быть остановлена сама по себе методом jq.stop(). (переменные upd и jq были определены в начале материала)

Добавление в очередь и планирование задания (асинхронная версия 20.x).

Добавим первое задание в очередь. Определим функцию обратного вызова и добавим ее в очередь заданий.

from telegram.ext import ContextTypes, Application

async def callback_minute(context):
    await context.bot.send_message(chat_id='@examplechannel', text='One message every minute')

application = Application.builder().token('TOKEN').build()
job_queue = application.job_queue

job_minute = job_queue.run_repeating(callback_minute, interval=60, first=10)

application.run_polling()

Функция callback_minute() будет выполняться каждые 60,0 секунд, первый раз через 10 секунд (поскольку first=10). Аргументы interval и first указываются в секундах, если они являются int или float. Они также могут быть объектами datetime. Возвращаемое значение этих функций - создаваемые объекты Job. Не нужно сохранять результат метода job_queue.run_repeating() (который является новым экземпляром задания), если он не нужен (в этом примере он будет использоваться позже).

Можно добавить задание, которое будет выполняться только один раз с задержкой:

from telegram.ext import ContextTypes, Application

async def callback_30(context):
    await context.bot.send_message(chat_id='@examplechannel', text='A single message with 30s delay')

application = Application.builder().token('TOKEN').build()
job_queue = application.job_queue

job_queue.run_once(callback_30, 30)

application.run_polling()

Через тридцать секунд должны получить сообщение от функции callback_30().

Если надоело получать сообщение каждую минуту, то можно временно отключить задание или даже полностью удалить его из очереди:

# Временно отключить это задание
job_minute.enabled = False
# Полностью удалите это задание
job_minute.schedule_removal()

Примечание: метод .schedule_removal не сразу удаляет задание из очереди. Он помечается для удаления и будет удален, как только закончится его текущий интервал (он не будет запускаться после того, как будет помечен для удаления).

Иногда встает необходимость добавить задания в ответ на определенные действия пользователя. Для этого есть удобный способ. Аргумент context функций обратных вызовов имеет уже готовую к использованию очередь context.job_queue. Еще одна функция, которую можно здесь использовать - это ключевые аргументы объекта Job: data, chat_id или user_id. Можно передать любой объект в качестве параметра данных при запуске задания и получить его на более позднем этапе, пока задание существует. Аргументы chat_id/user_id позволяет легко сообщить заданию, о каком чате идет речь. Таким образом, в обратном вызове задания можно получить доступ к context.chat_data/context.user_data.

Смотрим, как это выглядит в коде:

from telegram import Update
from telegram.ext import CommandHandler, Application, ContextTypes
 
async def callback_alarm(context):
    # Отправляем `BEEP` человеку, который запустил команду `\timer`
    await context.bot.send_message(chat_id=context.job.chat_id, text=f'BEEP {context.job.data}!')
  
async def callback_timer(update, context):
    chat_id = update.message.chat_id
    name = update.effective_chat.full_name
    await context.bot.send_message(chat_id=chat_id, text='Setting a timer for 1 minute!')
    # Ставим будильник для функции `callback_alarm()`
    context.job_queue.run_once(callback_alarm, 60, data=name, chat_id=chat_id)
 
application = Application.builder().token('TOKEN').build()
timer_handler = CommandHandler('timer', callback_timer)
application.add_handler(timer_handler)
application.run_polling()

Поместив chat_id в объект Job, функция обратного вызова callback_alarm() знает, куда она должна отправить сообщение.

Если остановить приложение, то соответствующая очередь заданий также будет остановлена.

Различия в работе объекта JobQueue версий 20.x и 13.x.

Новые аргументы chat_id и user_id.

Все методы JobQueue.run_* имеют два новых аргумента chat_id и user_id, что позволяет легко связать пользователя/чат с заданием. При указании этих аргументов соответствующий идентификатор будет доступен в обратном вызове задания через context.job.chat_id и context.job.user_id.

Кроме того, будут доступны context.job.chat_data и context.job.user_data. Это имеет некоторые тонкие преимущества по сравнению с предыдущим обходным решением job_queue.run_*(..., context=context.chat_data), и вместо этого рекомендуется использовать эту новую функциональность.

Переименован аргумент context в data.

Чтобы устранить частую путаницу между context и context.job.context, версия 20.0 переименовала аргумент context всех методов JobQueue.run_* в аргумент data. Это также относится к соответствующему атрибуту Job.

Изменения в методе JobQueue.run_daily().

Начиная с версии 20.0 поведение этого метода согласовано с cron, т. е. 0 - это воскресенье, а 6 - суббота.

Изменения в методе JobQueue.run_monthly()

К сожалению, аргумент day_is_strict работал некорректно и поэтому был удален. Вместо него теперь можно передать day='last', чтобы задание выполнялось в последний день месяца.

Краткая справка по классу telegram.ext.JobQueue.

Класс JobQueue позволяет периодически выполнять задачи с ботом. Это удобная обертка модуля APScheduler.

Атрибуты и методы объекта JobQueue:

JobQueue.scheduler

Возвращает планировщик APScheduler,

JobQueue.get_jobs_by_name(name):

Возвращает кортеж всех ожидающих/запланированных заданий с заданным именем name, которые в настоящее время находятся в JobQueue.

JobQueue.jobs():

Возвращает кортеж всех ожидающих / запланированных заданий, которые в настоящее время находятся в JobQueue.

JobQueue.run_custom(callback, job_kwargs=None, data=None, name=None, chat_id=None, user_id=None):

Создает новое настраиваемое задание.

Аргументы:

  • callback - функция обратного вызова.
  • job_kwargs=None - словарь: произвольные ключевого аргументы для передачи в scheduler.add_job().
  • data=None - дополнительные данные, необходимые для функции обратного вызова. Доступ к Job.data можно получить в функции обратного вызова через context.job.data. По умолчанию значение равно None. (Изменено в версии 20.0: аргумент context переименован в data.)
  • name=None - имя задания. По умолчанию callback.__name__.
  • chat_id - идентификатор чата, связанного с этим заданием. Если передано, то соответствующие данные chat_data будут доступны в обратном вызове. (Новое в версии 20.0.)
  • user_id - идентификатор пользователя, связанного с этим заданием. Если передано, соответствующие user_data будут доступны в обратном вызове. (Новое в версии 20.0.)
  • job_kwargs=None - произвольные ключевые аргументы для передачи в apscheduler.schedulers.base.BaseScheduler.add_job().
JobQueue.run_daily(callback, time, days=(0, 1, 2, 3, 4, 5, 6), data=None, name=None, chat_id=None, user_id=None, job_kwargs=None):

Создает новое задание, которое запускается ежедневно, и добавляет его в очередь.

Аргументы:

  • time - datetime.time: время дня, в которое должно выполняться задание. Если часовой пояс time.tzinfo=None, то будет использоваться часовой пояс бота по умолчанию.
  • days=(0, 1, 2, 3, 4, 5, 6) - кортеж: определяет, в какие дни недели должно выполняться задание (где 0-6 соответствуют понедельник - воскресенье). По умолчанию каждый день.

Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom().

JobQueue.run_monthly(callback, when, day, data=None, name=None, chat_id=None, user_id=None, job_kwargs=None):

Создает новое задание, которое запускается ежемесячно, и добавляет его в очередь.

Аргументы:

  • when - время дня, в которое должно выполняться задание. Если часовой пояс time.tzinfo=None, то будет использоваться часовой пояс бота по умолчанию.
  • day - int: определяет день месяца, в который будет запускаться задание. Оно должно быть в пределах от 1 до 31 включительно.
  • day_is_strict=True - bool: если False и day > month.days, то будет выбран последний день месяца. (Удален в версии 20.x)

С версии 20.x, для того, что-бы задание выполнилось в последний день месяца нужно передать day = 'last'.

Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom().

JobQueue.run_once(callback, when, data=None, name=None, chat_id=None, user_id=None, job_kwargs=None):

Создает новое задание, которое запускается один раз, и добавляет его в очередь.

Аргументы:

  • when - int или float или datetime.timedelta или datetime.datetime или datetime.time. Определяет время в которое должно выполняться задание. Этот параметр будет интерпретироваться в зависимости от его типа:
    • int или float будут интерпретироваться как "секунды с этого момента", в которые должно выполняться задание.
    • datetime.timedelta будет интерпретироваться как "время с этого момента", в течение которого должно выполняться задание.
    • datetime.datetime будет интерпретироваться как конкретная дата и время, в которое должно выполняться задание. Если часовой пояс datetime.tzinfo=None, то будет использоваться часовой пояс бота по умолчанию.
    • datetime.time будет интерпретироваться как определенное время суток, в которое должно выполняться задание. Это может быть либо сегодня, либо, если время уже прошло - завтра. Если часовой пояс time.tzinfo=None, то будет использоваться часовой пояс бота по умолчанию.

Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom().

JobQueue.run_repeating(callback, interval, first=None, last=None, data=None, name=None, chat_id=None, user_id=None, job_kwargs=None):

Создает новое задание, которое запускается с заданными интервалами, и добавляет его в очередь.

  • interval - int или float или datetime.timedelta: интервал, в котором будет выполняться задание. Если это int или float, то они будут интерпретироваться как секунды.
  • first=None - Определяет время, в которое должно выполняться задание. Этот аргумент интерпретируется как аргумент when в JobQueue.run_once().
  • last=None - Самое позднее возможное время для выполнения задания. Этот аргумент интерпретируется как аргумент when в JobQueue.run_once().

Если last - это тип datetime.datetime или datetime.time, а last.tzinfo=None, то будет принят часовой пояс бота по умолчанию.

Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom().

JobQueue.set_dispatcher(dispatcher):

Доступен только в версии 13.x. Устанавливает диспетчер dispatcher, который будет использоваться JobQueue.

JobQueue.set_application(application):

Доступен только в версии 20.x. Устанавливает диспетчер dispatcher, который будет использоваться JobQueue.

JobQueue.start():

Запускает поток job_queue. В версии 20.x этот метод является сопрограммой.

JobQueue.stop():

Останавливает поток job_queue. В версии 20.x этот метод является сопрограммой.


Пример бота Telegram c использованием планировщика:

Простой бот для отправки синхронизированных сообщений Telegram. Этот бот использует класс JobQueue для отправки синхронизированных сообщений. Сначала определяются несколько функций обработчика. Затем эти функции передаются Диспетчеру и регистрируются в соответствующих местах. Затем бот запускается и работает до тех пор, пока не нажать Ctrl+C.

Пример для многопоточной версии 13.x.

import logging

from telegram import Update
from telegram.ext import Updater, CommandHandler, CallbackContext

# Включаем ведение журнала
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)

logger = logging.getLogger(__name__)

def remove_job_if_exists(name, context):
    """
       Удаляет задание с заданным именем. 
       Возвращает, было ли задание удалено
    """
    current_jobs = context.job_queue.get_jobs_by_name(name)
    if not current_jobs:
        return False
    for job in current_jobs:
        job.schedule_removal()
    return True

# Определяем несколько обработчиков команд. 
# Они обычно принимают два аргумента `update` 
# и `context`. Обработчики ошибок также получают
# поднятый объект `TelegramError`.
def start(update, _): # объект `context` не используется, заменяем его на `_`
    update.message.reply_text('Используйте "/set <seconds>" для установки напоминания.')

# объект `update` в функции не используется, заменяем его на `_`
def alarm(_, context):
    """Отправляет сообщение-напоминание ."""
    job = context.job
    context.bot.send_message(job.context, text='Beep!')

def set_timer(update, context):
    """Добавляет задание в очередь."""
    chat_id = update.message.chat_id
    try:
        # args[0] содержит время для таймера в секундах
        due = int(context.args[0])
        if due < 0:
            update.message.reply_text('Нельзя вернуться в прошлое!')
            return

        job_removed = remove_job_if_exists(str(chat_id), context)
        context.job_queue.run_once(alarm, due, context=chat_id, name=str(chat_id))

        text = 'Таймер успешно установлен!'
        if job_removed:
            text += ' Old one was removed.'
        update.message.reply_text(text)

    except (IndexError, ValueError):
        update.message.reply_text('Usage: /set <seconds>')

def unset(update, context):
    """Удаляет задание, если пользователь передумал."""
    chat_id = update.message.chat_id
    job_removed = remove_job_if_exists(str(chat_id), context)
    text = 'Таймер успешно отменен!' if job_removed else 'Нет активных таймеров.'
    update.message.reply_text(text)

def main():
    """Запуск бота."""
    # передайте токен вашего бота.
    updater = Updater("TOKEN")

    # диспетчер для регистрации обработчиков
    dispatcher = updater.dispatcher

    # ​​ответ в Telegram по разным командам 
    dispatcher.add_handler(CommandHandler("start", start))
    dispatcher.add_handler(CommandHandler("help", start))
    dispatcher.add_handler(CommandHandler("set", set_timer))
    dispatcher.add_handler(CommandHandler("unset", unset))

    # Старт бота
    updater.start_polling()

    # Блокирует до тех пор, пока не нажать Ctrl-C или
    # процесс не получит SIGINT, SIGTERM или SIGABRT.
    updater.idle()

if __name__ == '__main__':
    main()

Пример для асинхронной версии 20.x:

import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes

logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)

def remove_job_if_exists(name: str, context: ContextTypes.DEFAULT_TYPE) -> bool:
    """Удаляет задание с указанным именем. Возвращает, было ли задание удалено."""
    current_jobs = context.job_queue.get_jobs_by_name(name)
    if not current_jobs:
        return False
    for job in current_jobs:
        job.schedule_removal()
    return True

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Объясняет, как пользоваться ботом."""
    await update.message.reply_text("Используйте "/set <seconds>" для установки напоминания")

async def alarm(context: ContextTypes.DEFAULT_TYPE) -> None:
    """Отправляет сообщение-напоминание ."""
    job = context.job
    await context.bot.send_message(job.chat_id, text=f"Beep! {job.data} секунд закончились!")

async def set_timer(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Добавляет задание в очередь."""
    chat_id = update.effective_message.chat_id
    try:
        # args[0] должен содержать время таймера в секундах.
        due = float(context.args[0])
        if due < 0:
            await update.effective_message.reply_text("Извините, нельзя вернуться в будущее!")
            return

        job_removed = remove_job_if_exists(str(chat_id), context)
        context.job_queue.run_once(alarm, due, chat_id=chat_id, name=str(chat_id), data=due)

        text = "Timer successfully set!"
        if job_removed:
            text += " Old one was removed."
        await update.effective_message.reply_text(text)

    except (IndexError, ValueError):
        await update.effective_message.reply_text("Используйте: /set <seconds>")

async def unset(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Удаляет задание, если пользователь передумал."""
    chat_id = update.message.chat_id
    job_removed = remove_job_if_exists(str(chat_id), context)
    text = "Таймер успешно отменен!" if job_removed else "Нет активного таймера."
    await update.message.reply_text(text)

def main() -> None:
    application = Application.builder().token("TOKEN").build()
    application.add_handler(CommandHandler(["start", "help"], start))
    application.add_handler(CommandHandler("set", set_timer))
    application.add_handler(CommandHandler("unset", unset))
    application.run_polling()

if __name__ == "__main__":
    main()