Сетевые потоковые (stream) соединения - это высокоуровневые примитивы с поддержкой синтаксиса async
/await
для работы с сетевыми подключениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.
Вот пример клиента TCP эхо-клиента, написанного с использованием потоков модуля asyncio
:
import asyncio async def tcp_echo_client(message): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888) print(f'Send: {message!r}') writer.write(message.encode()) await writer.drain() data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() await writer.wait_closed() asyncio.run(tcp_echo_client('Hello World!'))
StreamReader
и StreamWriter
;asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
:Функция asyncio.open_connection()
устанавливает сетевое соединение и возвращает пару объектов (reader, writer)
.
Возвращаемые объекты reader
и writer
являются экземплярами классов asyncio.StreamReader
и asyncio.StreamWriter
.
Аргумент цикла loop
является необязательным и всегда может быть определен автоматически, когда эта функция ожидает выполнения от сопрограммы.
Аргумент limit
определяет предел размера буфера, используемый возвращенным экземпляром asyncio.StreamReader
. По умолчанию установлено значение 64 КБ.
Аргументы передаются непосредственно в низкоуровневый метод цикла событий loop.create_connection()
.
asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
:Функция asyncio.start_server()
запускает сервер сокетов.
Обратный вызов client_connected_cb
вызывается всякий раз, когда устанавливается новое клиентское соединение. Вызов получает пару (reader, writer)
в качестве двух аргументов, которые являются экземплярами классов asyncio.StreamReader
и asyncio.StreamWriter
`.
Обратный вызов client_connected_cb
может быть простой вызываемой функцией или функцией сопрограммы. Если это функция сопрограммы, то она будет автоматически запланирована как задача.
Аргумент цикла loop
является необязательным и всегда может быть определен автоматически, когда эта функция ожидает выполнения от сопрограммы.
Аргумент limit
определяет предел размера буфера, используемый возвращенным экземпляром asyncio.StreamReader
. По умолчанию установлено значение 64 КБ.
Аргументы передаются непосредственно в низкоуровневый метод цикла событий [loop.create_server()
].
asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
:Функция asyncio.open_unix_connection()
устанавливает соединение с Unix сокетом и возвращает парный кортеж (reader, writer)
.
Функция asyncio.open_unix_connection()
работает подобно asyncio.open_connection()
, но работает с сокетами Unix.
Смотрите также документацию по низкоуровневому методу цикла событий loop.create_unix_connection()
.
Доступность: Unix.
asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
:Функция asyncio.open_unix_connection()
запускает сервер сокетов Unix.
Функция asyncio.start_unix_server()
работает подобно asyncio.start_server()
, но работает с сокетами Unix.
Смотрите также документацию по низкоуровневому методу цикла событий loop.create_unix_server()
.
Доступность: Unix.
StreamReader
и StreamWriter
.asyncio.StreamReader
:Представляет объект чтения, который предоставляет API для чтения данных из потока ввода-вывода.
Не рекомендуется создавать экземпляры объектов StreamReader
напрямую. Для создания экземпляра asyncio.StreamReader
используйте функции asyncio.open_connection()
и asyncio.start_server()
.
Объект asyncio.StreamReader
предоставляет следующие методы:
StreamReader.read(n=-1)
: читает до n
байт. Если n
не указано, то читает до EOF.StreamReader.readline()
: читает одну байтовую строку, оканчивающаяся на \n
.StreamReader.readexactly(n)
: читает ровно n
байт. Если EOF достигается раньше, то вызывается ошибка IncompleteReadError
. StreamReader.readuntil(separator=b'\n')
: читает данные из потока до тех пор, пока не будет найден разделитель separator
.StreamReader.at_eof()
: возвращает True
, если буфер пуст и была вызвана функция feed_eof()
.asyncio.StreamWriter
:Представляет объект записи, который предоставляет API для записи данных в поток ввода-вывода.
Не рекомендуется создавать экземпляры объектов StreamWriter
напрямую. Для создания экземпляра asyncio.StreamWriter
используйте функции asyncio.open_connection()
и asyncio.start_server()
.
Объект asyncio.StreamWriter
предоставляет следующие методы:
StreamWriter.write(data)
: метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены. Этот метод следует использовать вместе с методом StreamWriter.drain()
:
Writer.write(data) await Writer.drain()
StreamWriter.writelines(data)
: метод немедленно записывает список байтов в базовый сокет. Если это не удается, данные помещаются в очередь во внутренний буфер записи до тех пор, пока они не будут отправлены. Метод следует использовать вместе с методом StreamWriter.drain()
.
StreamWriter.close()
: метод закрывает поток и базовый сокет. Этот метод следует использовать вместе с методом StreamWriter.wait_closed()
:
Writer.close(data) await Writer.wait_closed()
StreamWriter.can_write_eof()
: возвращает True
, если базовый транспорт поддерживает метод StreamWriter.write_eof()
, в противном случае - False
.
StreamWriter.write_eof()
: закрывает конец записи потока после того, как буферизованные данные записи будут сброшены.
StreamWriter.transport()
: возвращает базовый транспорт asyncio.
StreamWriter.get_extra_info()
: доступ к дополнительной информации о транспорте
StreamWriter.drain()
: ждет, пока не будет уместно возобновить запись в поток.
StreamWriter.wait_closed()
: ждет, пока поток закроется.
StreamWriter.is_closing()
: возвращает True
, если поток закрыт или находится в процессе закрытия.
TCP эхо-клиент, использующий функцию asyncio.open_connection()
:
import asyncio async def tcp_echo_client(message): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888) print(f'Send: {message!r}') writer.write(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() asyncio.run(tcp_echo_client('Hello World!'))
TCP эхо-сервер с использованием функции asyncio.start_server()
:
import asyncio async def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() asyncio.run(main())
Простой пример запроса HTTP-заголовков URL-адреса, переданного в командной строке:
import urllib.parse import asyncio, sys async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': reader, writer = await asyncio.open_connection( url.hostname, 443, ssl=True) else: reader, writer = await asyncio.open_connection( url.hostname, 80) query = ( f"HEAD {url.path or '/'} HTTP/1.0\r\n" f"Host: {url.hostname}\r\n" f"\r\n" ) writer.write(query.encode('latin-1')) while True: line = await reader.readline() if not line: break line = line.decode('latin1').rstrip() if line: print(f'HTTP header> {line}') # Ignore the body, close the socket writer.close() url = sys.argv[1] asyncio.run(print_http_headers(url))
Использование:
python example.py http://example.com/path/page.html # или с HTTPS: python example.py https://example.com/path/page.html
Сопрограмма ждет, пока сокет не получит данные с помощью функции asyncio.open_connection()
:
import asyncio import socket async def wait_for_data(): # Получаем ссылку на текущий цикл событий, потому что # нужно получить доступ к низкоуровневым API. loop = asyncio.get_running_loop() # Создаем пару подключенных сокетов. rsock, wsock = socket.socketpair() # Регистрируем открытый сокет для ожидания данных. reader, writer = await asyncio.open_connection(sock=rsock) # Имитация приема данных из сети loop.call_soon(wsock.send, 'abc'.encode()) # Ждем данные data = await reader.read(100) # Есть данные, закрываем сокет print("Received:", data.decode()) writer.close() # закрываем второй сокет wsock.close() asyncio.run(wait_for_data())