Сообщить об ошибке.

Объекты Transport и Protocol в цикле событий asyncio в Python

Сетевой интерфейс ввода-вывода низкоуровнего кода asyncio

Объекты asyncio.Transport и asyncio.Protocol модуля asyncio используются низкоуровневыми API-интерфейсами цикла событий, такими как loop.create_connection().

Транспорт и протоколы используют стиль программирования на основе обратного вызова и обеспечивают высокопроизводительную реализацию сетевых протоколов или протоколов IPC (например HTTP).

По сути, транспорты и протоколы модуля asyncio следует использовать только в библиотеках и фреймворках, а не в высокоуровневых асинхронных приложениях.

На самом высоком уровне транспорт (asyncio.Transport) связан с тем, как передаются байты, в то время как протокол (asyncio.Protocol) определяет, какие байты передать и в некоторой степени, когда нужно передавать.

То же самое можно сказать и по-другому: транспорт - это абстракция для сокета (или аналогичной конечной точки ввода-вывода), а протокол - это абстракция для приложения с точки зрения транспорта.

Еще одна точка зрения состоит в том, что вместе интерфейсы транспорта и протокола определяют абстрактный интерфейс для использования сетевого ввода-вывода и межпроцессного ввода-вывода.

Между объектами транспорта (transport) и объектами протокола (protocol) всегда существует соотношение 1:1. Протокол вызывает методы транспорта для отправки данных, а транспорт в свою очередь, вызывает методы протокола для передачи полученных данных.

Большинство методов цикла событий, ориентированных на соединения (таких как loop.create_connection()), обычно принимают аргумент protocol_factory, используемый для создания объекта протокола для принятого соединения, представленного объектом транспорта. Такие методы обычно возвращают кортеж (transport, protocol).

Содержание:


Транспорт.

Транспорт - это классы, предоставляемые модулем asyncio для абстрагирования различных типов каналов связи.

Объекты транспорта всегда создаются с помощью цикла событий asyncio.

Модуль asyncio реализует транспорт для TCP, UDP, SSL и каналов подпроцесса. Доступные для транспорта методы зависят от его типа.

Классы транспортов не являются потокобезопасными.

asyncio.BaseTransport:

Класс asyncio.BaseTransport представляет собой базовый класс для всех транспортов. Содержит методы, общие для всех транспортов asyncio.

BaseTransport.close():

Метод BaseTransport.close() закрывает транспорт.

Если транспорт имеет буфер для исходящих данных, то буферные данные будут сброшены асинхронно. Больше никаких данных не будет получено. После того, как все буферные данные будут сброшены, метод Protocol.connection_lost() будет вызываться с None в качестве аргумента.

BaseTransport.is_closing():

Метод BaseTransport.is_closing() возвращает True, если транспорт закрывается или уже закрыт.

BaseTransport.get_extra_info(name, default=None):

Метод BaseTransport.get_extra_info() возвращает информации о транспорте или базовых ресурсах, которые он использует.

Аргумент name - это строка, представляющая часть информации о транспорте.

по умолчанию значение для возврата, если информация недоступна, или если транспорт не поддерживает запрос с данной третьей стороной реализации цикла событий или на текущей платформе.

Например, следующий код пытается получить базовый объект транспорта сокета:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

Категории информации, которую можно запросить для некоторых транспортов:

  • socket:
    • 'peername': удаленный адрес, к которому подключен сокет, результат socket.socket.getpeername() или None;
    • 'socket': экземпляр socket.socket();
    • 'sockname': собственный адрес сокета, результат socket.socket.getsockname()]socket.obj.
  • SSL socket:
    • 'compression': алгоритм сжатия используется как строка или None, если соединение не сжато; результат ssl.SSLSocket.compression();
    • 'cipher': кортеж из трех значений, содержащий имя используемого шифра, версию протокола SSL, определяющую его использование, и количество используемых секретных битов; результат ssl.SSLSocket.cipher();
    • 'peercert': одноранговый сертификат; результат ssl.SSLSocket.getpeercert();
    • 'sslcontext': экземпляр ssl.SSLContext;
    • 'ssl_object': ssl.SSLObject или ssl.SSLSocket экземпляр.
  • pipe:
    • 'pipe': объект канала pipe.
  • subprocess:

BaseTransport.set_protocol(protocol):

Метод BaseTransport.set_protocol() устанавливает новый протокол protocol.

Переключение протокола должно выполняться только тогда, когда оба протокола задокументированы и поддерживают этот транспорт.

BaseTransport.get_protocol():

Метод BaseTransport.get_protocol() возвращает текущий протокол.

asyncio.WriteTransport(BaseTransport):

Класс asyncio.WriteTransport() представляет собой базовый транспорт для подключений, только для записи.

Экземпляры класса WriteTransport() возвращаются из метода цикла событий loop.connect_write_pipe(), а также используются методами, связанными с подпроцессами, такими как loop.subprocess_exec().

WriteTransport.abort():

Метод WriteTransport.abort() немедленно закрывает транспорт, не дожидаясь завершения незавершенных операций. Буферизованные данные будут потеряны. Больше никаких данных не будет.

В конечном итоге метод протокола Protocol.connection_lost() будет вызываться с параметром None в качестве аргумента.

WriteTransport.can_write_eof():

Метод WriteTransport.can_write_eof() возвращает True, если транспорт поддерживает Transport.write_eof() и False, если нет.

WriteTransport.get_write_buffer_size():

Метод WriteTransport.get_write_buffer_size() возвращает текущий размер выходного буфера, используемого транспортом.

WriteTransport.get_write_buffer_limits():

Метод WriteTransport.get_write_buffer_limits() получает маркеры пределов low и high для управления потоком записи. Возвращает кортеж (low, high), где low и high - положительное количество байтов.

Чтобы установить эти предельные значения используйте метод Transport.set_write_buffer_limits().

WriteTransport.set_write_buffer_limits(high=None, low=None):

Метод WriteTransport.set_write_buffer_limits() устанавливает верхний high и нижний low маркеры пределов для управления потоком записи.

Эти два значения измеряются в байтах и определяют, когда вызываются методы протокола protocol.pause_writing() и protocol.resume_writing(). Если указан, нижний low предел, то он должен быть меньше или равен верхнему пределу high. Оба предела не могут быть отрицательными.

Метод протокола protocol.pause_writing() вызывается, когда размер буфера становится больше или равен максимальному значению high. Если запись была приостановлена, то вызывается функция protocol.resume_writing(), когда размер буфера становится меньше или равен нижнему low значению.

Значения по умолчанию зависят от реализации. Если указан только high, то low по умолчанию имеет значение, зависящее от реализации, меньшее или равное максимальному high пределу.

Установка high=0 также приводит к автоматическому выставлению low=0 и вызывает вызов метод protocol.pause_writing() всякий раз, когда буфер становится непустым.

Установка low=0 приводит к тому, что protocol.resume_writing() вызывается только после того, как буфер пуст. Использование нуля для любого предела обычно неоптимально, так как уменьшает возможности одновременного выполнения операций ввода-вывода и вычислений.

Используйте метод Transport.get_write_buffer_limits(), чтобы получить эти ограничения.

WriteTransport.write(data):

Метод WriteTransport.write() записывает несколько байтов данных в транспорт.

Этот метод не блокирует, он буферизует данные и организует их асинхронную отправку.

WriteTransport.writelines(list_of_data):

Метод WriteTransport.writelines() записывает список (или итерируемую последовательность) байтов данных в транспорт. Это функционально эквивалентно вызову Transport.write() для каждого элемента, полученного итерацией, но может быть реализовано более эффективно.

WriteTransport.write_eof():

Метод WriteTransport.write_eof() закрывает записывающую сторону транспорта, после очистки всех буферизованных данных. Данные все еще могут быть получены.

Этот метод может вызвать исключение NotImplementedError, если транспорт (например, SSL) не поддерживает полузакрытые соединения.

asyncio.ReadTransport(BaseTransport):

Класс asyncio.ReadTransport() представляет собой базовый транспорт для подключений, только для чтения.

Экземпляры класса ReadTransport возвращаются из метода цикла событий loop.connect_read_pipe(), а также используются методами, связанными с подпроцессами, такими как loop.subprocess_exec().

ReadTransport.is_reading():

Метод ReadTransport.is_reading() возвращает True, если транспорт получает новые данные.

Новое в Python 3.7.

ReadTransport.pause_reading():

Метод ReadTransport.pause_reading() приостанавливает читающую сторону транспорта. Никакие данные не будут передаваться в метод протокола Protocol.data_received() до тех пор, пока не будет вызван Protocol.resume_reading().

Изменено в Python 3.7: метод идемпотентен, т.е. его можно вызывать, когда транспорт уже приостановлен или закрыт.

ReadTransport.resume_reading():

Метод ReadTransport.resume_reading() возобновляет читающую сторону транспорта. Метод протокола Protocol.data_received() будет вызван еще раз, если какие-то данные стали доступны для чтения.

Изменено в Python 3.7: метод идемпотентен, т.е. его можно вызывать, когда транспорт уже что-то читает.

asyncio.Transport(WriteTransport, ReadTransport):

Класс asyncio.Transport() представляет собой интерфейс, представляющий двунаправленный транспорт, например TCP-соединение.

Пользователь не должен создавать экземпляр транспорта напрямую. Пользователи должны вызывать служебную функцию, передавая ей фабрику протокола и другую информацию, необходимую для создания транспорта и протокола.

Экземпляры класса Transport возвращаются или используются такими методами цикла событий, как loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile() и т. Д.

asyncio.DatagramTransport(BaseTransport):

Класс asyncio.DatagramTransport() представляет собой транспорт для датаграммных (UDP) соединений.

Экземпляры класса DatagramTransport возвращаются из метода цикла событий loop.create_datagram_endpoint().

DatagramTransport.sendto(data, addr=None):

Метод DatagramTransport.sendto() отправляет байты данных data удаленному хосту, заданному аргументом addr (целевой адрес, зависящий от транспорта). Если addr=None, то данные отправляются на целевой адрес, указанный при создании транспорта.

Этот метод не блокирует, он буферизует данные и организует их асинхронную отправку.

DatagramTransport.abort():

Метод DatagramTransport.abort() немедленно закрывает транспорт, не дожидаясь завершения незавершенных операций. Буферизованные данные будут потеряны. Больше никаких данных не будет.

В конечном итоге метод протокола protocol.connection_lost() будет вызываться с аргументом None.

asyncio.SubprocessTransport(BaseTransport):

Абстрактный класс asyncio.SubprocessTransport() представляет связь между родительским и дочерним процессом ОС.

Экземпляры класса SubprocessTransport возвращаются из методов цикла событий loop.subprocess_shell() и loop.subprocess_exec().

Подробнее о предоставляемых методах смотрите в разделе "Создание субпроцесса из цикла событий asyncio".


Протоколы.

Модуль asyncio предоставляет набор абстрактных базовых классов, которые следует использовать для реализации сетевых протоколов. Эти классы предназначены для использования вместе с транспортами.

Подклассы абстрактных базовых классов протокола могут реализовывать некоторые или все методы. Все эти методы являются обратными вызовами: они вызываются транспортом при определенных событиях, например, при получении некоторых данных. Метод базового протокола должен вызываться соответствующим транспортом.

asyncio.BaseProtocol:

Атрибут asyncio.BaseProtocol представляет собой базовый протокол с методами, общими для всех протоколов.

Обратные вызовы соединения.

Обратные вызовы соединения вызываются по всем протоколам ровно один раз за успешное соединение. Все остальные обратные вызовы протокола могут быть вызваны только между этими двумя методами.

BaseProtocol.connection_made(transport):

Метод BaseProtocol.connection_made() вызывается при установлении соединения.

Аргумент transport - это транспорт, представляющий соединение. Протокол отвечает за хранение ссылки на свой транспорт.

BaseProtocol.connection_lost(exc):

Метод BaseProtocol.connection_lost() вызывается, когда соединение потеряно или закрыто.

Аргумент exc является либо объектом исключения, либо None. Последнее означает, что получен обычный EOF, или соединение было прервано или закрыто этой стороной соединения.

Обратные вызовы управления потоком.

Обратные вызовы управления потоком могут быть вызваны транспортом для приостановки или возобновления записи, выполняемой протоколом.

Смотрите документацию метода Transport.set_write_buffer_limits() для получения более подробной информации.

BaseProtocol.pause_writing():

Метод BaseProtocol.pause_writing() вызывается, когда буфер транспорта выходит за верхний разрешенный предел.

BaseProtocol.resume_writing():

Метод BaseProtocol.resume_writing() вызывается, когда буфер транспорта опускается ниже нижнего разрешенного предела.

Если размер буфера равен верхнему значению предела, то метод Protocol.pause_writing() не вызывается: размер буфера должен строго превышать.

И наоборот, Protocol.resume_writing() вызывается, когда размер буфера равен или меньше нижнего разрешенного предела. Эти конечные условия важны для обеспечения того, чтобы все шло так, как ожидалось, когда любая из оценок равна нулю.

asyncio.Protocol(BaseProtocol):

Класс asyncio.Protocol() представляет собой базовый класс для реализации потоковых протоколов (TCP, Unix sockets и т. д.).

Все протоколы asyncio могут реализовывать обратные вызовы базового протокола.

Методы событий, такие как loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() и loop.connect_write_pipe(), принимают фабрики, которые возвращают протоколы потоковой передачи.

Protocol.data_received(data):

Метод Protocol.data_received() вызывается при получении данных.

Аргумент data - это непустой байтовый объект, содержащий входящие данные.

Будут ли данные буферизированы, разделены на части или повторно собраны, зависит от транспорта. В общем, не надо полагаться на конкретную семантику и делайте синтаксический анализ универсальным и гибким. Данные всегда поступают в правильном порядке.

Метод можно вызывать произвольное количество раз, пока открыто соединение.

НО метод Protocol.eof_received() вызывается не более одного раза. После вызова Protocol.eof_received(), метод Protocol.data_received() больше не вызывается.

Protocol.eof_received():

Метод Protocol.eof_received() вызывается, когда на другом конце сообщают, что больше не будет отправлять данные например, путем вызова Transport.write_eof(), если конечно другой конец также использует asyncio.

Этот метод может вернуть ложное значение (включая None), и в этом случае транспорт закроется. И наоборот, если этот метод возвращает истинное значение, то используемый протокол определяет, следует ли закрыть транспорт. Так как реализация по умолчанию возвращает None, то она неявно закрывает соединение.

Некоторые транспорты, включая SSL, не поддерживают полузакрытые соединения, и в этом случае возврат значения True из этого метода приведет к закрытию соединения.

Механизм состояний:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

asyncio.BufferedProtocol(BaseProtocol):

Класс asyncio.BufferedProtocol() представляет собой базовый класс для реализации потоковых протоколов с ручным управлением приемным буфером.

Новое в Python 3.7.

Буферизованные протоколы можно использовать с любым методом цикла событий, который поддерживает протоколы потоковой передачи.

Реализации BufferedProtocol допускают явное ручное выделение буфера приема и управление им. Затем, циклы событий могут использовать буфер, предоставленный протоколом, чтобы избежать ненужных копий данных. Это может привести к заметному повышению производительности протоколов, которые получают большие объемы данных. Сложные реализации протокола могут значительно сократить количество выделяемых буферов.

Для экземпляров BufferedProtocol вызываются следующие обратные вызовы:

BufferedProtocol.get_buffer(sizehint):

Метод BufferedProtocol.get_buffer() вызывается для выделения нового буфера приема.

Аргумент sizehint - рекомендуемый минимальный размер возвращаемого буфера. Допустимо использовать буферы меньшего или большего размера, чем предлагает sizehint. При значении sizehint=-1 размер буфера может быть произвольным. Использование буфера нулевого размера является ошибкой.

Метод BufferedProtocol.get_buffer() должен возвращать объект, реализующий протокол буфера.

BufferedProtocol.buffer_updated(nbytes):

Метод BufferedProtocol.buffer_updated() вызывается, когда буфер был обновлен полученными данными.

Аргумент nbytes - это общее количество байтов, записанных в буфер.

BufferedProtocol.eof_received():

Метод BufferedProtocol.eof_received() смотрите документацию метода Protocol.eof_received().

Protocol.get_buffer() можно вызывать произвольное количество раз во время соединения. Однако protocol.eof_received() вызывается не более одного раза, и если вызывается, то методы Protocol.get_buffer() и Protocol.buffer_updated() не будут вызываться после него.

Механизм состояний:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

asyncio.DatagramProtocol(BaseProtocol):

Класс asyncio.DatagramProtocol() представляет собой базовый класс для реализации протоколов дейтаграмм (UDP).

Экземпляры UDP протокола должны быть построены фабриками протоколов, переданными в метод loop.create_datagram_endpoint().

DatagramProtocol.datagram_received(data, addr):

Метод DatagramProtocol.datagram_received() вызывается при получении UDP соединением входящих данных.

  • data - это байтовый объект, содержащий входящие данные.
  • addr - адрес хоста, отправляющего данные, точный формат зависит от транспорта.

DatagramProtocol.error_received(exc):

Метод DatagramProtocol.error_received() вызывается, когда предыдущая операция отправки или получения вызывает ошибку OSError.

Аргумент exc - это экземпляр OSError.

Этот метод вызывается в редких случаях, когда UDP транспорт обнаруживает, что дейтаграмма не может быть доставлена ​​получателю. Во многих случаях недоставленные дейтаграммы автоматически отбрасываются.

Примечание. В системах BSD (macOS, FreeBSD и т. д.) управление потоком не поддерживается для протоколов дейтаграмм, т.к. нет надежного способа обнаружить сбоя отправки, вызванные записью слишком большого количества пакетов.

Сокет всегда выглядит "готовым", и лишние пакеты отбрасываются. Исключение OSError с errno, установленным в errno.ENOBUFS может быть вызвано или нет. Если исключение будет вызвано, то будет сообщен DatagramProtocol.error_received(), но в противном случае проигнорирован.

asyncio.SubprocessProtocol(BaseProtocol):

Класс asyncio.SubprocessProtocol() представляет собой базовый класс для реализации протоколов, взаимодействующих с дочерними процессами (однонаправленные каналы).

Подробнее о предоставляемых методах смотрите в разделе "Создание субпроцесса из цикла событий asyncio".


Примеры использования Transport и Protocol модуля asyncio.


Пример UDP эхо-сервера.

UDP эхо-сервер используя метод loop.create_datagram_endpoint(), отправляет обратно полученные данные:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

async def main():
    print("Starting UDP server")

    # Получаем ссылку на цикл событий, т.к. планируем
    # использовать низкоуровневый API.
    loop = asyncio.get_running_loop()

    # Для обслуживания всех клиентских запросов 
    # будет создан один экземпляр протокола.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()

asyncio.run(main())

Пример UDP эхо-клиента.

UDP эхо-клиент, используя метод loop.create_datagram_endpoint(), отправляет данные, а когда получает ответ - закрывает транспорт:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)

async def main():
    # Получаем ссылку на цикл событий, т.к. 
    # планируем использовать низкоуровневый API.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()

asyncio.run(main())

Пример подключения к существующему сокету.

В примере будем ждать, пока сокет получит данные, используя метод loop.create_connection() с протоколом:

import asyncio
import socket

class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # Готово: закрываем транспорт;
        # connection_lost() будет вызываться автоматически.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)

async def main():
    # Получаем ссылку на цикл событий, т.к. 
    # планируем использовать низкоуровневый API.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Создаем пару подключенных сокетов
    rsock, wsock = socket.socketpair()

    # Регистрируем сокет для получения данных
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Имитация приема данных из сети.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

Смотрите пример слежения за файловым дескриптором на предмет событий чтения в разделе "Наблюдение за дескрипторами файлов из цикла событий", в нем используется низкоуровневый метод loop.add_reader() для регистрации fd.

Также смотрите пример регистрации открытого сокета для ожидания данных с использованием потоков stream. Пример использует высокоуровневый API потоков stream модуля asyncio, созданные функцией asyncio.open_connection() в сопрограмме.