Сообщить об ошибке.

Советы по программированию сокетов в Python

Во первых этот материал для тех, кто уже пробовал программировать сокеты, а во вторых здесь будет говорится только о сокетах INET (то есть IPv4) STREAM (т.е. TCP), так как они составляют не менее 99% используемых сокетов. От сокета STREAM можно получить лучшую производительность, чем от какого-то другого. Так же приоткроем тайну того, что такое сокет и дадим несколько советов, как работать с блокирующими и неблокирующими сокетами. Начнем разбираться с блокирующих сокетов, т.к. необходимо знать, как они работают, прежде чем работать с неблокирующими сокетами,

Содержание.

Проблема с пониманием работы сокетов заключается в том, что сокет может означать несколько разных тонких вещей в зависимости от контекста. Итак, сначала проведем различие между клиентским сокетом - конечной точкой диалога и серверным сокетом, который больше похож на оператора коммутатора. Клиентское приложение, например браузер, использует исключительно клиентские сокеты, а веб-сервер, с которым он разговаривает, использует как серверные, так и клиентские сокеты.

Что такое сокет и как он создается?

Грубо говоря, когда происходит переход по ссылке на сайте, браузер делает что-то вроде следующего:

# создал INET, STREAM сокет
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# подключился к веб-серверу по обычному https-порту
sock.connect((`docs-python.ru`, 443))

Когда устанавливается соединение sock.connect(), то сокет sock можно использовать для отправки текста страницы. Тот же сокет прочитает ответ, а затем будет уничтожен. Клиентские сокеты обычно используются только для разового обмена данными или небольшого набора последовательных обменов данными.

То, что происходит на веб-сервере, немного сложнее. Сначала веб-сервер создает серверный сокет:

# создает INET, STREAM сокет
serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# связывает сокет с общедоступным хостом и хорошо известным портом
serv_sock.bind((socket.gethostname(), 443))
# и начинает ждать подключений
serv_sock.listen(5)

Следует отметить пару вещей: в коде выше использовалась функция socket.gethostname(), чтобы сокет был виден внешнему миру. Если использовать вызов serv_sock.bind(('localhost', 443)) или serv_sock.bind(('127.0.0.1', 443)), то серверный сокет был бы виден только локальной машине. Вызов serv_sock.bind(('', 443)) указывает, что сокет доступен по любому адресу.

Второе замечание: порты с небольшим числом (обычно до 3-х цифр) зарезервированы для "хорошо известных" служб `(HTTPS, SNMP и т. д.), по этому, в качестве номера порта сокета всегда необходимо использовать числа их 4-х цифр, например 8000.

Наконец, аргумент 5 вызова serv_sock.listen(5), говорит модулю socket, чтобы сервер поставил в очередь до 5 клиентов на подключение (нормальный максимум), прежде чем отклонять остальные запросы. Если остальная часть кода написана правильно, то этого должно быть достаточно.

Теперь, когда есть серверный сокет, прослушивающий 443 порт, можно войти в основной цикл веб-сервера:

while True:
    # принимаем подключение
    (clientsocket, address) = serv_sock.accept()
    # здесь что-то делают с `serv_sock`... 
    # сделаем вид, что это потоковый сервер
    client = client_thread(client_sock)
    client.run()

Существует 3 основных способа, которыми этот цикл может работать:

  • диспетчеризация потока для работы с клиентским сокетом,
  • создание нового процесса для работы с клиентским сокетом,
  • реструктуризация всего приложения для использования неблокирующих сокетов и мультиплексирование между серверным сокетом и любыми активными клиентскими сокетами используя модуль select.

Подробнее об этом позже. Сейчас важно понять, что это все, что делает серверный сокет. Он не отправляет и не получает никаких данных. Он просто воспроизводит/создает клиентские сокеты. Каждый клиентский сокет создается в ответ на то, что какой-то новый клиентский сокет выполняет подключение sock.connect() к серверу на определенный хост и порт. На этот запрос, сервер создает новый клиентский сокет, и как только он это сделает то сразу возвращается к прослушиванию следующих подключений. Два клиента могут свободно общаться, например на каком-нибудь динамически выделенном порту, который будет закрыт после общения.

Межпроцессорное взаимодействие (IPC).

Если необходим быстрый IPC между двумя процессами на одной машине, то следует изучить каналы Pipe() или общую память (объекты Value() и Array()). Если все же решите использовать socket.AF_INET сокеты, то необходимо привязать серверный сокет к localhost. На большинстве платформ это позволит сократить несколько уровней сетевого кода и будет работать немного быстрее.

Смотрите также модуль multiprocessing, который интегрирует межплатформенный IPC в API более высокого уровня.

Использование сокета.

Первое, на что следует обратить внимание, это то, что клиентский сокет браузера и клиентский сокет веб-сервера - полностью идентичны. То есть, это диалог одноранговый. Обычно сокет, который подключается к серверу начинает диалог, отправляя запрос или возможно, вход в систему. Но это уже решение программиста, а не сокета, как построить диалог.

Для приема/передачи данных можно использовать методы объекта сокета Socket.send() и Socket.recv(), а можно превратить клиентский сокет в файловый объект и использовать чтение/запись. По поводу использования сокета, как файлового объекта, необходимо сделать предупреждение, что в сокетах, при выполнении записи, нужно использовать вызов file.flush(). Сокеты имеют дело с буферизованными "файлами" и распространенной ошибкой является - записать что-то и не вызвать file.flush(), а затем перейти в режим чтения ответа. При этом можно бесконечно долго ждать ответа, т. к. записанные данные все еще могут оставаться в выходном буфере.

Теперь о главном в программировании сокетов, методы Socket.send() и Socket.recv() основное внимание уделяют обработке сетевых буферов. Эти методы не обязательно обработают все байты, которые им передаются или считываются из них, за один вызов. Методы возвращают количество обработанных байтов, когда сетевой буфер заполняется Socket.send() или считывается/извлекается Socket.recv(). Программист несет ответственность за то, чтобы вызывать эти методы снова и снова, пока сообщение не будет полностью обработано (передано или считано).

Когда метод Socket.recv() возвращает 0 байтов, это означает, что другая сторона закрыла или находится в процессе закрытия соединения. Следовательно данные из этого подключении больше не будут поступать. Например протокол HTTP/HTTPS использует сокет только для одной передачи. Клиент отправляет запрос, затем читает ответ, сокет закрывается, это означает, что клиент (браузер) может определить конец ответа, получив 0 байтов.

Но если в планах повторно использовать открытый сокет для каких-то задач, то нужно понимать, что в сокетах нет EOT - "end of trensfer" (конец файла). ЕЩЕ РАЗ: если сокет после вызова методов Socket.send() или Socket.recv() возвращает 0 байтов, то соединение было разорвано. Если соединение не было разорвано, то можно вечно ждать получения данных вызовом Socket.recv(), т. к. сокет не может сказать, что читать больше нечего.

Теперь должно прийти понимание фундаментальной истины сокетов: сообщения должны быть либо определенной фиксированной длины, либо разделены на несколько частей маркерами, либо с указанием длины сообщения (что намного лучше), либо заканчиваться отключением соединения. Выбор полностью за программистом.

В общем, самым простым решением являются сообщения фиксированной длины:

class MySocket:
    """демонстрационный класс -  этот код
       представлен для понимания процесса, 
       а не для эффективности
    """

    def __init__(self, sock=None):
        if sock is None:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.sock = sock

    def connect(self, host, port):
        self.sock.connect((host, port))

    def mysend(self, msg):
        totalsent = 0
        while totalsent < MSGLEN:
            sent = self.sock.send(msg[totalsent:])
            if sent == 0:
                raise RuntimeError("Соединение сокета нарушено")
            totalsent = totalsent + sent

    def myreceive(self):
        chunks = []
        bytes_recd = 0
        while bytes_recd < MSGLEN:
            chunk = self.sock.recv(min(MSGLEN - bytes_recd, 2048))
            if chunk == b'':
                raise RuntimeError("Соединение сокета нарушено")
            chunks.append(chunk)
            bytes_recd = bytes_recd + len(chunk)
        return b''.join(chunks)

Например можно использовать какой то определенный код отправки практически для любой схемы обмена сообщениями. В Python сокетах ведь отправляются строки, следовательно можно использовать функцию len() для определения их длины. В основном, усложняется код приема.

Самое простое решение, это сделать первый символ сообщения индикатором "типа" сообщения и в этом "типе" передавать длину сообщения. Теперь, когда в начале сообщения передается его длина, то необходимо вызвать метод Socket.recv() два раза: первый - чтобы получить длину сообщения, и второй в цикле, для получения частями самого сообщения (если оно большое), ориентируясь на эту длину.

Если идти по пути сообщений с маркерами, то необходимо получать фрагменты произвольного размера и сканировать полученное сообщение в поисках этих маркеров. Фрагменты по 4096 или 8192 часто хорошо подходят для размеров сетевого буфера. При этом следует помнить об одной сложности: если используемый протокол позволяет отправлять несколько сообщений друг за другом без какого-либо ответа, а вызов Socket.recv() читает произвольный размер фрагмента, то можно в конечном итоге, не чайно, прочитать начало следующего сообщения. В этом случае нужно его запомнить и держать, пока оно не понадобится.

Добавление в начало сообщения его длины больше 4-х цифр, скажем 5 цифровых символов становится более сложным, потому что (хотите верьте, хотите нет) нельзя получить все 5 символов за один вызов, следовательно количество циклов Socket.recv() увеличивается до 3-х, а то и больше. При высоких нагрузках код, не использующий два цикла вызова Socket.recv() - первый для определения длины, второй для получения сообщения с данными, очень быстро сломается. В этот момент еще обнаружите, что методу Socket.send() не всегда удается отослать все сообщение за один вызов. И несмотря на то, что здесь про это написано, в конечном итоге ВЫ все равно наступите на эти грабли!

В интересах создания первого приложения с использованием сокетов, вышесказанные улучшения оставлены как упражнение для читателя.

Прием/передача двоичных данных.

Вполне возможно отправлять двоичные данные через сокет. Основная проблема заключается в том, что не все машины используют одни и те же форматы для двоичных данных. Например, чип Motorola будет представлять 16-битное целое число, например 1 в виде двух шестнадцатеричных байтов - 00 01. Intel и DEC переворачивают байты - то же самое число 1 здесь будет выглядеть как 01 00. Модуль socket имеет функции для преобразования 16 и 32-битных целых чисел - socket.ntohl(), socket.htonl(), socket.ntohs() и socket.htons(), где в названиях первая буква означает n - сеть, а h - хост, а последняя s - короткий, а l - длинный. Там, где сетевой порядок является порядком хоста, функции ничего делать не будут, но там, где машина перевернула байты, они соответствующим образом все поменяют.

Ascii-представление двоичных данных часто меньше по размеру, чем двоичное представление этих же данных. Строка '0' будет состоять из двух байтов, а двоичная - из четырех. Конечно, это не очень хорошо сочетается с сообщениями фиксированной длины.

Закрытие соединения сокета.

Строго говоря, сначала необходимо использовать вызов объекта сокета shutdown(), прежде чем закрыть его командой Socket.close(). Вызов Socket.shutdown() - это предупреждение для сокета на другом конце. В зависимости от аргумента, который передавать, это может означать "Я больше не буду отправлять, но я все равно буду слушать" или "Я не слушаю, мне по барабану". Разработчики библиотек сокетов настолько привыкли к тому, что программисты пренебрегают этим элементом этикета, что у некоторых, обычный вызов Socket.close() - означает последовательность вызовов: Socket.shutdown(); Socket.close(). Поэтому в большинстве ситуаций явный вызов Socket.shutdown() не требуется.

Один из способов эффективного использования shutdown(), это обмен данными, подобный HTTP. Клиент отправляет запрос и затем завершает работу вызовом Socket.shutdown(1). Это сообщает серверу: "Этот клиент завершил отправку, но все еще может получать". Сервер читая запрос, в конце получает 0 байтов, это сигнализирует о том, что от клиента весь запрос получен и надо готовить и отправлять ответ. Если отправка ответа завершилась успешно, то клиент действительно слушал и получил все отправленные данные.

Python делает еще один шаг к автоматическому завершению соединения, это постоянный мониторинг открытых сокетов сборщиком мусора. Сборщик мусора автоматически закрывает соединение, если это необходимо. Но полагаться на это - очень плохая привычка. Если сокет просто исчезнет без закрытия, то сокет на другом конце может зависнуть бесконечно думая, что сервер просто медленно работает и когда закончит, то закроет сокет.

Когда умирают сокеты.

Худшее в использовании блокирующих сокетов - это то, что происходит, когда одна из сторон соединения резко падает (без закрытия). Сокет скорее всего зависнет. TCP - надежный протокол и он будет долго ждать, прежде чем отказаться от соединения. Если использовать потоки, то весь поток практически умрет. С этим ничего не поделаешь и если не делать глупостей, таких как держать блокировку при выполнении чтения, то поток не съест много ресурсов.

Не пытайтесь убить поток - одна из причин того, что потоки более эффективны, чем процессы, заключается в том, что у них нет накладных расходов, связанных с автоматическим повторным использованием ресурсов. Другими словами, если удастся убить поток, то вся программа, скорее всего, упадет.

Неблокирующие сокеты

Чтобы сделать сокет неблокирующим, в Python используют вызов Socket.setblocking(False). Вызов делается после создания сокета, но перед его использованием. Если программист не очень умный, то скорее всего попытается переключаться туда и обратно (с блокирующего сокета на неблокирующий).

Основное механическое отличие состоит в том, что вызовы Socket.send(), Socket.recv(), Socket.connect() и Socket.accept() могут возвращать результат без каких-либо действий. И здесь есть несколько вариантов. Можно проверить ответ, который вернул соответствующий вызов и код ошибки и вообще свести себя с ума. Если не верите, попробуйте как-нибудь. Приложение будет разрастаться, глючить и загружать процессор. Так что давайте пропустим безумные решения и сделаем все правильно.

Одним из возможных решений является делегирование работы с клиентами отдельным потокам. Однако создание потоков и переключение контекстов между ними на самом деле не является дешевой операцией. Для решения этой проблемы существует так называемый асинхронный способ работы с сокетами. Основная идея состоит в том, чтобы делегировать поддержание состояния сокета операционной системе и позволить ей уведомлять программу, когда есть что-то для чтения из сокета или когда он готов к записи. Для этого можно использовать вызов операционной системы select, подробнее о нем можно посмотреть командой терминала Unix $ man select

В Python такой вызов сделать совсем несложно, для этой цели используйте встроенный модуль select, в частности вызов select.select():

ready_to_read, ready_to_write, in_error = \
    select.select(
                  potential_readers,
                  potential_writers,
                  potential_errs,
                  timeout
                  )

Здесь передается в select.select() три списка:

  • potential_readers содержит все сокеты, которые нужно прочитать,
  • potential_writers содержит все сокеты, в которые надо что-то записать,
  • potential_errs - которые нужно проверять на наличие ошибок (обычно оставляют пустым).

Следует отметить, что один сокет может входить в разные списки. Вызов select.select() блокируется, но можно задать ему таймаут. Как правило, это разумный поступок - дайте ему длительный таймаут (скажем, минуту), если нет веских причин поступить иначе.

Взамен получаем три списка. Они содержат сокеты, которые действительно доступны для чтения, записи и содержат ошибки. Каждый из этих списков является подмножеством, возможно пустым, соответствующего переданного списка в select.select().

Если сокет находится в выходном списке ready_to_read, то вызов метода Socket.recv() на этом сокете может что-то вернуть. Та же идея для списка сокетов ready_to_write - можно что-нибудь отправить. Может быть, не все, но что-то лучше, чем ничего. На самом деле, любой достаточно здоровый сокет будет возвращен как доступный для записи - это означает, что доступно исходящее сетевое буферное пространство.

Если есть "серверный" сокет, то поместим его в список potential_readers. Если он появится в списке ready_to_read, то вызов .accept почти наверняка сработает. Если сервер создал новый сокет для подключения, то поместим его в список potential_writers. Если он отображается в списке ready_to_write, то есть неплохие шансы, что он подключился.

Предупреждение о переносимости: в Unix, модуль select работает как с сокетами, так и с файлами. Не пытайтесь использовать это в Windows. В Windows, модуль select работает только с сокетами. Также обратите внимание, что в языке C многие из более продвинутых параметров сокетов в Windows выполняются иначе. Фактически, в Windows обычно используют потоки, которые работают очень и очень хорошо с сокетами.

Пример асинхронного сервера с вызовом select.select().

В примере вызывается select.select(), для опроса сокетов операционной системой, готовы ли они к записи, чтению или есть ли какое-то ошибки в сокетах. Этот вызов блокирует программу (если не передан аргумент тайм-аута) до тех пор, пока не будут готовы какие нибудь из переданных сокетов. По готовности хоты-бы одного из сокетов, вызов select.select() вернет три списка с сокетами для указанных операций. Затем программа последовательно перебирает эти списки и выполняет соответствующие операции.

import select, socket, queue

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.bind(('localhost', 50000))
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}

while inputs:
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    for s in readable:
        if s is server:
            connection, client_address = s.accept()
            connection.setblocking(False)
            inputs.append(connection)
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            outputs.remove(s)
        else:
            s.send(next_msg)

    for s in exceptional:
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]

Когда в списке readable присутствует сокет сервера, это означает, что пришел новый клиент. Поэтому программа вызывает метод Socket.accept() и добавляет возвращаемый методом клиентский сокет, в список inputs, а так же создает для него очередь входящих сообщений message_queues. Если в списке readable есть НЕ серверный сокет, то это означает, что от клиента прибыли некоторые сообщения и готовы к чтению. В этом случае производится чтение сообщения и добавления его в очередь с ключом этого сокета.

Для сокетов из списка writable программа извлекает сообщение из соответствующей очереди message_queues (если есть) и записывает их в сокет. Если в сокете есть ошибка, он удаляет сокет из всех списков опроса (inputs и outputs).

Так работают сокеты на низком уровне. В большинстве случаев нет необходимости реализовывать логику на таком низком уровне. Рекомендуется использовать некоторые абстракции более высокого уровня, такие как Twisted, Tornado или ZeroMQ, в зависимости от ситуации.