Сообщить об ошибке.

Работа с сетевыми соединениями модуля asyncio в Python

Сетевые сокет соединения и сервера с поддержкой async/await

Сетевые потоковые (stream) соединения - это высокоуровневые примитивы с поддержкой синтаксиса async/await для работы с сетевыми подключениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.

Вот пример клиента TCP эхо-клиента, написанного с использованием потоков модуля asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Содержание:


Функции STREAM соединений:

asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None):

Функция asyncio.open_connection() устанавливает сетевое соединение и возвращает пару объектов (reader, writer).

Возвращаемые объекты reader и writer являются экземплярами классов asyncio.StreamReader и asyncio.StreamWriter.

Аргумент цикла loop является необязательным и всегда может быть определен автоматически, когда эта функция ожидает выполнения от сопрограммы.

Аргумент limit определяет предел размера буфера, используемый возвращенным экземпляром asyncio.StreamReader. По умолчанию установлено значение 64 КБ.

Аргументы передаются непосредственно в низкоуровневый метод цикла событий loop.create_connection().

asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True):

Функция asyncio.start_server() запускает сервер сокетов.

Обратный вызов client_connected_cb вызывается всякий раз, когда устанавливается новое клиентское соединение. Вызов получает пару (reader, writer) в качестве двух аргументов, которые являются экземплярами классов asyncio.StreamReader и asyncio.StreamWriter`.

Обратный вызов client_connected_cb может быть простой вызываемой функцией или функцией сопрограммы. Если это функция сопрограммы, то она будет автоматически запланирована как задача.

Аргумент цикла loop является необязательным и всегда может быть определен автоматически, когда эта функция ожидает выполнения от сопрограммы.

Аргумент limit определяет предел размера буфера, используемый возвращенным экземпляром asyncio.StreamReader. По умолчанию установлено значение 64 КБ.

Аргументы передаются непосредственно в низкоуровневый метод цикла событий [loop.create_server()].

Функции UNIX сокетов.

asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None):

Функция asyncio.open_unix_connection() устанавливает соединение с Unix сокетом и возвращает парный кортеж (reader, writer).

Функция asyncio.open_unix_connection() работает подобно asyncio.open_connection(), но работает с сокетами Unix.

Смотрите также документацию по низкоуровневому методу цикла событий loop.create_unix_connection().

Доступность: Unix.

asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True):

Функция asyncio.open_unix_connection() запускает сервер сокетов Unix.

Функция asyncio.start_unix_server() работает подобно asyncio.start_server(), но работает с сокетами Unix.

Смотрите также документацию по низкоуровневому методу цикла событий loop.create_unix_server().

Доступность: Unix.

Объекты StreamReader и StreamWriter.

asyncio.StreamReader:

Представляет объект чтения, который предоставляет API для чтения данных из потока ввода-вывода.

Не рекомендуется создавать экземпляры объектов StreamReader напрямую. Для создания экземпляра asyncio.StreamReader используйте функции asyncio.open_connection() и asyncio.start_server().

Объект asyncio.StreamReader предоставляет следующие методы:

  • StreamReader.read(n=-1): читает до n байт. Если n не указано, то читает до EOF.
  • StreamReader.readline(): читает одну байтовую строку, оканчивающаяся на \n.
  • StreamReader.readexactly(n): читает ровно n байт. Если EOF достигается раньше, то вызывается ошибка IncompleteReadError.
  • StreamReader.readuntil(separator=b'\n'): читает данные из потока до тех пор, пока не будет найден разделитель separator.
  • StreamReader.at_eof(): возвращает True, если буфер пуст и была вызвана функция feed_eof().

asyncio.StreamWriter:

Представляет объект записи, который предоставляет API для записи данных в поток ввода-вывода.

Не рекомендуется создавать экземпляры объектов StreamWriter напрямую. Для создания экземпляра asyncio.StreamWriter используйте функции asyncio.open_connection() и asyncio.start_server().

Объект asyncio.StreamWriter предоставляет следующие методы:

  • StreamWriter.write(data): метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены. Этот метод следует использовать вместе с методом StreamWriter.drain():

    Writer.write(data)
    await Writer.drain()
    
  • StreamWriter.writelines(data): метод немедленно записывает список байтов в базовый сокет. Если это не удается, данные помещаются в очередь во внутренний буфер записи до тех пор, пока они не будут отправлены. Метод следует использовать вместе с методом StreamWriter.drain().

  • StreamWriter.close(): метод закрывает поток и базовый сокет. Этот метод следует использовать вместе с методом StreamWriter.wait_closed():

    Writer.close(data)
    await Writer.wait_closed()
    
  • StreamWriter.can_write_eof(): возвращает True, если базовый транспорт поддерживает метод StreamWriter.write_eof(), в противном случае - False.

  • StreamWriter.write_eof(): закрывает конец записи потока после того, как буферизованные данные записи будут сброшены.

  • StreamWriter.transport(): возвращает базовый транспорт asyncio.

  • StreamWriter.get_extra_info(): доступ к дополнительной информации о транспорте

  • StreamWriter.drain(): ждет, пока не будет уместно возобновить запись в поток.

  • StreamWriter.wait_closed(): ждет, пока поток закроется.

  • StreamWriter.is_closing(): возвращает True, если поток закрыт или находится в процессе закрытия.


Примеры создания серверных и клиентских stream соединений:

TCP эхо-клиент с использованием потоков.

TCP эхо-клиент, использующий функцию asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

TCP эхо-сервер с использованием потоков.

TCP эхо-сервер с использованием функции asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Получение HTTP заголовков.

Простой пример запроса HTTP-заголовков URL-адреса, переданного в командной строке:

import urllib.parse
import asyncio, sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Использование:

python example.py http://example.com/path/page.html

# или с HTTPS:

python example.py https://example.com/path/page.html

Регистрация открытого сокета для ожидания данных с использованием потоков.

Сопрограмма ждет, пока сокет не получит данные с помощью функции asyncio.open_connection():

import asyncio
import socket

async def wait_for_data():
    # Получаем ссылку на текущий цикл событий, потому что 
    # нужно получить доступ к низкоуровневым API.
    loop = asyncio.get_running_loop()

    # Создаем пару подключенных сокетов.
    rsock, wsock = socket.socketpair()

    # Регистрируем открытый сокет для ожидания данных.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Имитация приема данных из сети
    loop.call_soon(wsock.send, 'abc'.encode())

    # Ждем данные
    data = await reader.read(100)

    # Есть данные, закрываем сокет
    print("Received:", data.decode())
    writer.close()

    # закрываем второй сокет
    wsock.close()

asyncio.run(wait_for_data())