Класс расширения telegram.ext.JobQueue
позволяет выполнять задачи с задержкой или даже периодически, с заданным интервалом. Помимо прочего, его можно использовать для отправки регулярных сообщений своим подписчикам Telegram канала.
Внимание! Пакеты
python-telegram-bot
версии 13.x будут придерживаться многопоточной парадигмы программирования (на данный момент актуальна версия 13.14). Пакеты версий 20.x и новее будут полностью асинхронными и на октябрь 2022 года, первый из них находится в предрелизе. Дополнительно смотрите основные изменения в пакетеpython-telegram-bot
версии 20.x.
Что необходимо учитывать при использовании планировщика JobQueue
:
JobQueue
обеспечивает простой и готовый к использованию способ планирования задач в соответствии с архитектурой библиотеки python-telegram-bot
.python-telegram-bot
, поэтому с версии 13 используется сторонняя библиотека.APScheduler
.python-telegram-bot
не гарантируют, что серверная часть останется неизменной. Например, если поддержка стороннего модуля APScheduler
будет прекращена, то придется искать альтернативы.JobQueue
;JobQueue
версий 20.x и 13.x;telegram.ext.JobQueue
;JobQueue
.Класс JobQueue
тесно интегрирован с другими классами расширений telegram.ext
.
Чтобы использовать JobQueue
, не нужно много делать. Когда создается экземпляр средства обновления Updater
, он автоматически создаст планировщик JobQueue
, подобно диспетчеру Dispatcher
:
from telegram.ext import Updater upd = Updater('TOKEN', use_context=True) dp = upd.dispatcher jq = upd.job_queue ...
Для многопоточной версии 13.x, созданная очередь заданий jq
уже связана с диспетчером. По этому, если нет веской причины, то не следует создавать экземпляр класса JobQueue
самостоятельно.
Для асинхронной версии python-telegram-bot
версии 20.x, планировщик JobQueue
автоматически создается, при создании приложения Telegram:
from telegram.ext import Application application = Application.builder().token('TOKEN').build() job_queue = application.job_queue
В асинхронной версии 20.x созданная очередь jq
также уже связана, но с приложением с application
.
Задачи, в очереди заданий, инкапсулируются классом Job
. Он принимает в качестве параметра функцию обратного вызова, которая будет выполнена, когда придет время. Функция обратного вызова всегда принимает один параметр: context
возвращаемый telegram.ext.CallbackContext
. Как и в случае функций, которые передаются обработчикам сообщений, через объект context
можно получить доступ к context.bot
- экземпляру telegram.Bot
. В этом конкретном случае, получить доступ к context.job
, который является экземпляром задачи Job
, которая инициировала обратный вызов (подробнее об этом позже).
Можно использовать следующие методы для создания заданий с разной частотой и временем:
job_queue.run_once
, job_queue.run_repeating
, job_queue.run_daily
job_queue.run_monthly
. Обратите внимание еще раз, что е нужно создавать экземпляр класса Job
напрямую.
Внимание! В коде, который приводится иже необходимо заменить канал
@examplechannel
- каналом, администратором которого является ваш бот, или вашим идентификатором пользователя. Для того, чтобы узнать свой идентификатор пользователя, можно использовать@userinfobot
.
Добавим первое задание в очередь, для этого определим функцию обратного вызова и добавим ее в очередь заданий.
def callback_minute(context: telegram.ext.CallbackContext): context.bot.send_message(chat_id='@examplechannel', text='One message every minute') job_minute = jq.run_repeating(callback_minute, interval=60, first=10)
Функция callback_minute
будет выполняться каждые 60,0 секунд, первый раз через 10 секунд (аргумент first=10
). Аргументы interval
и first
указываются в секундах (могут иметь тип int
, float
или datetime.timedelta()
). А аргумент first
кроме того может принимать значения datetime.date()
и datetime.time()
.
Возвращаемое значение функций обратного вызова - это создаваемые объекты Job
. Не нужно сохранять результат run_repeating
(который является новым экземпляром задания), воспользуемся им позже.
Также можно добавить задание, которое будет выполняться только один раз, с задержкой:
def callback_30(context: telegram.ext.CallbackContext): context.bot.send_message(chat_id='@examplechannel', text='A single message with 30s delay') # Через тридцать секунд будет получено # сообщение от функции `callback_30`. jq.run_once(callback_30, 30)
Для того, что бы временно отключить задание или даже полностью удалить его из очереди, сделайте следующее:
# Временно отключает задание `job_minute` job_minute.enabled = False # Полностью удаляет работу `job_minute` job_minute.schedule_removal()
Примечание: метод .schedule_removal()
не удаляет задание сразу из очереди. Задание помечается для удаления и будет удалено, как только истечет его текущий интервал (задание не будет снова запускаться после того, как будет помечено для удаления).
Иногда необходимо сделать "напоминалку", т.е. добавить задания в ответ на определенный пользовательский ввод, и есть удобный способ сделать это. Аргумент контекста обратных вызовов Handler
имеет JobQueue
, прикрепленный как context.job_queue
, готовый к использованию. Еще одна возможность, которую можно использовать - это ключевой аргумент context
для задания Job
. То есть можно передать любой объект в качестве параметра контекста при запуске задания и получить его на более позднем этапе, пока задание существует.
Смотрим, как это выглядит в коде:
from telegram.ext import CommandHandler def callback_alarm(context): context.bot.send_message(chat_id=context.job.context, text='BEEP') def callback_timer(update, context): context.bot.send_message(chat_id=update.message.chat_id, text='Setting a timer for 1 minute!') context.job_queue.run_once(callback_alarm, 60, context=update.message.chat_id) # сработает при команде `/timer` timer_handler = CommandHandler('timer', callback_timer) dp.add_handler(timer_handler)
Помещая chat_id
в объект Job
, функция обратного вызова знает, куда она должна отправить сообщение.
Если остановить Updater
методом upd.stop()
, то соответствующая очередь заданий также будет остановлена. Очередь заданий можете быть остановлена сама по себе методом jq.stop()
. (переменные upd
и jq
были определены в начале материала)
Добавим первое задание в очередь. Определим функцию обратного вызова и добавим ее в очередь заданий.
from telegram.ext import ContextTypes, Application async def callback_minute(context): await context.bot.send_message(chat_id='@examplechannel', text='One message every minute') application = Application.builder().token('TOKEN').build() job_queue = application.job_queue job_minute = job_queue.run_repeating(callback_minute, interval=60, first=10) application.run_polling()
Функция callback_minute()
будет выполняться каждые 60,0 секунд, первый раз через 10 секунд (поскольку first=10
). Аргументы interval
и first
указываются в секундах, если они являются int
или float
. Они также могут быть объектами datetime
. Возвращаемое значение этих функций - создаваемые объекты Job
. Не нужно сохранять результат метода job_queue.run_repeating()
(который является новым экземпляром задания), если он не нужен (в этом примере он будет использоваться позже).
Можно добавить задание, которое будет выполняться только один раз с задержкой:
from telegram.ext import ContextTypes, Application async def callback_30(context): await context.bot.send_message(chat_id='@examplechannel', text='A single message with 30s delay') application = Application.builder().token('TOKEN').build() job_queue = application.job_queue job_queue.run_once(callback_30, 30) application.run_polling()
Через тридцать секунд должны получить сообщение от функции callback_30()
.
Если надоело получать сообщение каждую минуту, то можно временно отключить задание или даже полностью удалить его из очереди:
# Временно отключить это задание job_minute.enabled = False # Полностью удалите это задание job_minute.schedule_removal()
Примечание: метод
.schedule_removal
не сразу удаляет задание из очереди. Он помечается для удаления и будет удален, как только закончится его текущий интервал (он не будет запускаться после того, как будет помечен для удаления).
Иногда встает необходимость добавить задания в ответ на определенные действия пользователя. Для этого есть удобный способ. Аргумент context
функций обратных вызовов имеет уже готовую к использованию очередь context.job_queue
. Еще одна функция, которую можно здесь использовать - это ключевые аргументы объекта Job: data
, chat_id
или user_id
. Можно передать любой объект в качестве параметра данных при запуске задания и получить его на более позднем этапе, пока задание существует. Аргументы chat_id
/user_id
позволяет легко сообщить заданию, о каком чате идет речь. Таким образом, в обратном вызове задания можно получить доступ к context.chat_data
/context.user_data
.
Смотрим, как это выглядит в коде:
from telegram import Update from telegram.ext import CommandHandler, Application, ContextTypes async def callback_alarm(context): # Отправляем `BEEP` человеку, который запустил команду `\timer` await context.bot.send_message(chat_id=context.job.chat_id, text=f'BEEP {context.job.data}!') async def callback_timer(update, context): chat_id = update.message.chat_id name = update.effective_chat.full_name await context.bot.send_message(chat_id=chat_id, text='Setting a timer for 1 minute!') # Ставим будильник для функции `callback_alarm()` context.job_queue.run_once(callback_alarm, 60, data=name, chat_id=chat_id) application = Application.builder().token('TOKEN').build() timer_handler = CommandHandler('timer', callback_timer) application.add_handler(timer_handler) application.run_polling()
Поместив chat_id
в объект Job
, функция обратного вызова callback_alarm()
знает, куда она должна отправить сообщение.
Если остановить приложение, то соответствующая очередь заданий также будет остановлена.
JobQueue
версий 20.x и 13.x.chat_id
и user_id
.Все методы JobQueue.run_*
имеют два новых аргумента chat_id
и user_id
, что позволяет легко связать пользователя/чат с заданием. При указании этих аргументов соответствующий идентификатор будет доступен в обратном вызове задания через context.job.chat_id
и context.job.user_id
.
Кроме того, будут доступны context.job.chat_data
и context.job.user_data
. Это имеет некоторые тонкие преимущества по сравнению с предыдущим обходным решением job_queue.run_*(..., context=context.chat_data)
, и вместо этого рекомендуется использовать эту новую функциональность.
context
в data
.Чтобы устранить частую путаницу между context
и context.job.context
, версия 20.0 переименовала аргумент context
всех методов JobQueue.run_*
в аргумент data
. Это также относится к соответствующему атрибуту Job
.
JobQueue.run_daily()
.Начиная с версии 20.0 поведение этого метода согласовано с cron
, т. е. 0 - это воскресенье, а 6 - суббота.
JobQueue.run_monthly()
К сожалению, аргумент day_is_strict
работал некорректно и поэтому был удален. Вместо него теперь можно передать day='last'
, чтобы задание выполнялось в последний день месяца.
telegram.ext.JobQueue
.Класс JobQueue
позволяет периодически выполнять задачи с ботом. Это удобная обертка модуля APScheduler
.
JobQueue
:JobQueue.scheduler
Возвращает планировщик APScheduler
,
JobQueue.get_jobs_by_name(name)
:Возвращает кортеж всех ожидающих/запланированных заданий с заданным именем name
, которые в настоящее время находятся в JobQueue
.
JobQueue.jobs()
:Возвращает кортеж всех ожидающих / запланированных заданий, которые в настоящее время находятся в JobQueue
.
JobQueue.run_custom(callback, job_kwargs=None, data=None, name=None, chat_id=None, user_id=None)
:Создает новое настраиваемое задание.
Аргументы:
callback
- функция обратного вызова.job_kwargs=None
- словарь: произвольные ключевого аргументы для передачи в scheduler.add_job()
.data=None
- дополнительные данные, необходимые для функции обратного вызова. Доступ к Job.data
можно получить в функции обратного вызова через context.job.data
. По умолчанию значение равно None
. (Изменено в версии 20.0: аргумент context
переименован в data
.)name=None
- имя задания. По умолчанию callback.__name__
.chat_id
- идентификатор чата, связанного с этим заданием. Если передано, то соответствующие данные chat_data
будут доступны в обратном вызове. (Новое в версии 20.0.)user_id
- идентификатор пользователя, связанного с этим заданием. Если передано, соответствующие user_data
будут доступны в обратном вызове. (Новое в версии 20.0.)job_kwargs=None
- произвольные ключевые аргументы для передачи в apscheduler.schedulers.base.BaseScheduler.add_job()
.JobQueue.run_daily(callback, time, days=(0, 1, 2, 3, 4, 5, 6), data=None, name=None, chat_id=None, user_id=None, job_kwargs=None)
:Создает новое задание, которое запускается ежедневно, и добавляет его в очередь.
Аргументы:
time
- datetime.time
: время дня, в которое должно выполняться задание. Если часовой пояс time.tzinfo=None
, то будет использоваться часовой пояс бота по умолчанию.days=(0, 1, 2, 3, 4, 5, 6)
- кортеж: определяет, в какие дни недели должно выполняться задание (где 0-6 соответствуют понедельник - воскресенье). По умолчанию каждый день.Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom()
.
JobQueue.run_monthly(callback, when, day, data=None, name=None, chat_id=None, user_id=None, job_kwargs=None)
:Создает новое задание, которое запускается ежемесячно, и добавляет его в очередь.
Аргументы:
when
- время дня, в которое должно выполняться задание. Если часовой пояс time.tzinfo=None
, то будет использоваться часовой пояс бота по умолчанию.day
- int
: определяет день месяца, в который будет запускаться задание. Оно должно быть в пределах от 1 до 31 включительно.day_is_strict=True
- bool
: если False
и day > month.days
, то будет выбран последний день месяца. (Удален в версии 20.x)С версии 20.x, для того, что-бы задание выполнилось в последний день месяца нужно передать day = 'last'
.
Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom()
.
JobQueue.run_once(callback, when, data=None, name=None, chat_id=None, user_id=None, job_kwargs=None)
:Создает новое задание, которое запускается один раз, и добавляет его в очередь.
Аргументы:
when
- int
или float
или datetime.timedelta
или datetime.datetime
или datetime.time
. Определяет время в которое должно выполняться задание. Этот параметр будет интерпретироваться в зависимости от его типа:int
или float
будут интерпретироваться как "секунды с этого момента", в которые должно выполняться задание.datetime.timedelta
будет интерпретироваться как "время с этого момента", в течение которого должно выполняться задание.datetime.datetime
будет интерпретироваться как конкретная дата и время, в которое должно выполняться задание. Если часовой пояс datetime.tzinfo=None
, то будет использоваться часовой пояс бота по умолчанию.datetime.time
будет интерпретироваться как определенное время суток, в которое должно выполняться задание. Это может быть либо сегодня, либо, если время уже прошло - завтра. Если часовой пояс time.tzinfo=None
, то будет использоваться часовой пояс бота по умолчанию.Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom()
.
JobQueue.run_repeating(callback, interval, first=None, last=None, data=None, name=None, chat_id=None, user_id=None, job_kwargs=None)
:Создает новое задание, которое запускается с заданными интервалами, и добавляет его в очередь.
interval
- int
или float
или datetime.timedelta
: интервал, в котором будет выполняться задание. Если это int
или float
, то они будут интерпретироваться как секунды.first=None
- Определяет время, в которое должно выполняться задание. Этот аргумент интерпретируется как аргумент when
в JobQueue.run_once()
.last=None
- Самое позднее возможное время для выполнения задания. Этот аргумент интерпретируется как аргумент when
в JobQueue.run_once()
. Если last
- это тип datetime.datetime
или datetime.time
, а last.tzinfo=None
, то будет принят часовой пояс бота по умолчанию.
Значение и поведение остальных аргументов, такое же как в JobQueue.run_custom()
.
JobQueue.set_dispatcher(dispatcher)
:Доступен только в версии 13.x. Устанавливает диспетчер dispatcher
, который будет использоваться JobQueue
.
JobQueue.set_application(application)
:Доступен только в версии 20.x. Устанавливает диспетчер dispatcher
, который будет использоваться JobQueue
.
JobQueue.start()
:Запускает поток job_queue
. В версии 20.x этот метод является сопрограммой.
JobQueue.stop()
:Останавливает поток job_queue
. В версии 20.x этот метод является сопрограммой.
Простой бот для отправки синхронизированных сообщений Telegram. Этот бот использует класс JobQueue
для отправки синхронизированных сообщений. Сначала определяются несколько функций обработчика. Затем эти функции передаются Диспетчеру и регистрируются в соответствующих местах. Затем бот запускается и работает до тех пор, пока не нажать Ctrl+C.
Пример для многопоточной версии 13.x.
import logging from telegram import Update from telegram.ext import Updater, CommandHandler, CallbackContext # Включаем ведение журнала logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) def remove_job_if_exists(name, context): """ Удаляет задание с заданным именем. Возвращает, было ли задание удалено """ current_jobs = context.job_queue.get_jobs_by_name(name) if not current_jobs: return False for job in current_jobs: job.schedule_removal() return True # Определяем несколько обработчиков команд. # Они обычно принимают два аргумента `update` # и `context`. Обработчики ошибок также получают # поднятый объект `TelegramError`. def start(update, _): # объект `context` не используется, заменяем его на `_` update.message.reply_text('Используйте "/set <seconds>" для установки напоминания.') # объект `update` в функции не используется, заменяем его на `_` def alarm(_, context): """Отправляет сообщение-напоминание .""" job = context.job context.bot.send_message(job.context, text='Beep!') def set_timer(update, context): """Добавляет задание в очередь.""" chat_id = update.message.chat_id try: # args[0] содержит время для таймера в секундах due = int(context.args[0]) if due < 0: update.message.reply_text('Нельзя вернуться в прошлое!') return job_removed = remove_job_if_exists(str(chat_id), context) context.job_queue.run_once(alarm, due, context=chat_id, name=str(chat_id)) text = 'Таймер успешно установлен!' if job_removed: text += ' Old one was removed.' update.message.reply_text(text) except (IndexError, ValueError): update.message.reply_text('Usage: /set <seconds>') def unset(update, context): """Удаляет задание, если пользователь передумал.""" chat_id = update.message.chat_id job_removed = remove_job_if_exists(str(chat_id), context) text = 'Таймер успешно отменен!' if job_removed else 'Нет активных таймеров.' update.message.reply_text(text) def main(): """Запуск бота.""" # передайте токен вашего бота. updater = Updater("TOKEN") # диспетчер для регистрации обработчиков dispatcher = updater.dispatcher # ответ в Telegram по разным командам dispatcher.add_handler(CommandHandler("start", start)) dispatcher.add_handler(CommandHandler("help", start)) dispatcher.add_handler(CommandHandler("set", set_timer)) dispatcher.add_handler(CommandHandler("unset", unset)) # Старт бота updater.start_polling() # Блокирует до тех пор, пока не нажать Ctrl-C или # процесс не получит SIGINT, SIGTERM или SIGABRT. updater.idle() if __name__ == '__main__': main()
Пример для асинхронной версии 20.x:
import logging from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) def remove_job_if_exists(name: str, context: ContextTypes.DEFAULT_TYPE) -> bool: """Удаляет задание с указанным именем. Возвращает, было ли задание удалено.""" current_jobs = context.job_queue.get_jobs_by_name(name) if not current_jobs: return False for job in current_jobs: job.schedule_removal() return True async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Объясняет, как пользоваться ботом.""" await update.message.reply_text("Используйте "/set <seconds>" для установки напоминания") async def alarm(context: ContextTypes.DEFAULT_TYPE) -> None: """Отправляет сообщение-напоминание .""" job = context.job await context.bot.send_message(job.chat_id, text=f"Beep! {job.data} секунд закончились!") async def set_timer(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Добавляет задание в очередь.""" chat_id = update.effective_message.chat_id try: # args[0] должен содержать время таймера в секундах. due = float(context.args[0]) if due < 0: await update.effective_message.reply_text("Извините, нельзя вернуться в будущее!") return job_removed = remove_job_if_exists(str(chat_id), context) context.job_queue.run_once(alarm, due, chat_id=chat_id, name=str(chat_id), data=due) text = "Timer successfully set!" if job_removed: text += " Old one was removed." await update.effective_message.reply_text(text) except (IndexError, ValueError): await update.effective_message.reply_text("Используйте: /set <seconds>") async def unset(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Удаляет задание, если пользователь передумал.""" chat_id = update.message.chat_id job_removed = remove_job_if_exists(str(chat_id), context) text = "Таймер успешно отменен!" if job_removed else "Нет активного таймера." await update.message.reply_text(text) def main() -> None: application = Application.builder().token("TOKEN").build() application.add_handler(CommandHandler(["start", "help"], start)) application.add_handler(CommandHandler("set", set_timer)) application.add_handler(CommandHandler("unset", unset)) application.run_polling() if __name__ == "__main__": main()