Сообщить об ошибке.

Паттерны использования io.BufferedIOBase() в Python

Содержание:

io.BufferedIOBase - это базовый класс для буферизованных бинарных потоков. Обычно вы используете его наследников (BufferedReader, BufferedWriter, BufferedRandom), но можно и наследоваться от него, если хотите:

  • добавить своё поведение поверх RawIOBase;
  • сделать нестандартную буферизацию;
  • реализовать "фильтр"/декоратор над другим бинарным потоком.

Ниже, основные паттерны использования io.BufferedIOBase() в Python.

Где BufferedIOBase стоит в иерархии

Типовая цепочка:

RawIOBase (низкоуровневый байтовый драйвер)
    ↓
BufferedIOBase (буферизация, read(), readline(), write())
    ↓
TextIOBase (TextIOWrapper - str-слой)

Т.е. ваш RawIOBase => ваш или стандартный BufferedIOBase => TextIOWrapper (если нужен текст).

Наследование от BufferedIOBase как "декоратора" над RawIOBase

Паттерн:

  • В конструктор даём raw: RawIOBase.
  • Внутри держим свой буфер (например, bytearray()).
  • Реализуем read, read1, readinto, write, flush, readable, writable.

Минималистичный только читающий буфер:

import io

class SimpleBufferedReader(io.BufferedIOBase):
    def __init__(self, raw: io.RawIOBase, buffer_size: int = 8192):
        if not raw.readable():
            raise ValueError("raw stream must be readable")
        self._raw = raw
        self._buffer = bytearray()
        self._buffer_size = buffer_size
        self._closed = False

    # --- служебный метод ---

    def _fill_buffer(self, min_bytes=1):
        """Дочитываем в буфер из raw, пока там не будет хотя бы min_bytes
        (или пока не достигнем EOF)."""
        while len(self._buffer) < min_bytes:
            chunk = self._raw.read(self._buffer_size)
            if not chunk:
                break
            self._buffer.extend(chunk)

    # --- интерфейс BufferedIOBase ---

    def readable(self):
        return True

    def read(self, size=-1):
        if self._closed:
            raise ValueError("I/O operation on closed file")

        if size is None or size < 0:
            # читаем всё до EOF
            chunks = [bytes(self._buffer)]
            self._buffer.clear()
            while True:
                chunk = self._raw.read(self._buffer_size)
                if not chunk:
                    break
                chunks.append(chunk)
            return b"".join(chunks)

        # читаем size байт
        self._fill_buffer(size)
        data = self._buffer[:size]
        del self._buffer[:size]
        return bytes(data)

    def read1(self, size=-1):
        """read1 обычно читает максимум один "пакет" из нижележащего raw."""
        if self._closed:
            raise ValueError("closed")

        if size is None or size < 0:
            size = self._buffer_size

        if not self._buffer:
            chunk = self._raw.read(size)
            if not chunk:
                return b""
            return chunk
        else:
            data = self._buffer[:size]
            del self._buffer[:size]
            return bytes(data)

    def close(self):
        if not self._closed:
            self._closed = True
            try:
                self._raw.close()
            finally:
                super().close()

    @property
    def closed(self):
        return self._closed

Использование:

raw = io.FileIO("some.bin", "rb")  # RawIOBase
buf = SimpleBufferedReader(raw)

data = buf.read(100)  # читаем через наш буфер

Буферизующий фильтр над другим бинарным потоком

Частый паттерн: декоратор на BufferedIOBase, который:

  • добавляет логирование,
  • ограничивает объём,
  • считывает/добавляет заголовки,
  • сжимает/расжимает, шифрует/дешифрует и т.п.

Пример: CountingBuffered - считает байты на чтении/записи

class CountingBuffered(io.BufferedIOBase):
    def __init__(self, base: io.BufferedIOBase):
        if not (base.readable() or base.writable()):
            raise ValueError("base must be readable or writable")
        self._base = base
        self.bytes_read = 0
        self.bytes_written = 0

    def readable(self):
        return self._base.readable()

    def writable(self):
        return self._base.writable()

    # --- чтение ---

    def read(self, size=-1):
        data = self._base.read(size)
        if data is not None:
            self.bytes_read += len(data)
        return data

    def read1(self, size=-1):
        if hasattr(self._base, "read1"):
            data = self._base.read1(size)
        else:
            data = self._base.read(size)
        if data is not None:
            self.bytes_read += len(data)
        return data

    # --- запись ---

    def write(self, b):
        n = self._base.write(b)
        if n:
            self.bytes_written += n
        return n

    def flush(self):
        self._base.flush()

    def close(self):
        self._base.close()
        super().close()

Использование:

raw = io.FileIO("log.bin", "wb")
buf = io.BufferedWriter(raw)
cnt = CountingBuffered(buf)

cnt.write(b"hello")
cnt.write(b"world")
cnt.flush()
print(cnt.bytes_written)  # 10

Ограниченный буфер (лимит размера записи/чтения)

Похожий на пример с RawIOBase, но уже на уровне буферизованного API (read, write).

Пример: лимитирующий буфер для чтения

class LimitedBufferedReader(io.BufferedIOBase):
    def __init__(self, base: io.BufferedIOBase, limit: int):
        if not base.readable():
            raise ValueError("base must be readable")
        self._base = base
        self._remaining = limit

    def readable(self):
        return True

    def read(self, size=-1):
        if self._remaining <= 0:
            return b""

        if size is None or size < 0 or size > self._remaining:
            size = self._remaining

        data = self._base.read(size)
        if not data:
            return data
        self._remaining -= len(data)
        return data

    def read1(self, size=-1):
        # просто делегируем на read() с тем же ограничением
        return self.read(size)

Использование:

f = open("bigfile.bin", "rb")
buf = io.BufferedReader(f)
limited = LimitedBufferedReader(buf, limit=1024)

chunk = limited.read()  # максимум 1024 байта

Линейно-ориентированный буфер (работа с readline)

Буферизованные потоки часто используются для построчного чтения (протоколы, логи).

Можно сделать фильтр строк, например, который:

  • добавляет префиксы,
  • удаляет лишние символы,
  • режет по max длине и т.д.

Пример: фильтр, отрезающий строки длиннее N

class TruncatingLineReader(io.BufferedIOBase):
    def __init__(self, base: io.BufferedIOBase, max_line_len: int):
        if not base.readable():
            raise ValueError("base must be readable")
        self._base = base
        self._max_line_len = max_line_len

    def readable(self):
        return True

    def readline(self, size=-1):
        line = self._base.readline(size)
        if len(line) > self._max_line_len:
            return line[:self._max_line_len] + b"...\n"
        return line

    def read(self, size=-1):
        # Можно просто делегировать, если фильтрация только в readline
        return self._base.read(size)

Использование:

f = open("log.txt", "rb")
buf = io.BufferedReader(f)
trunc = TruncatingLineReader(buf, max_line_len=80)

for _ in range(5):
    print(trunc.readline())

Использование peek / read1 для протоколов

BufferedReader (и его наследники) умеют peek(size) и read1(size):

  • peek - можно заглянуть в данные, не "вынимая" их из буфера;
  • read1 - прочитать один "кусок" (обычно одно чтение из нижележащего потока).

Паттерн: протокол с заголовком фиксированного размера.

class FramedReader(io.BufferedIOBase):
    """
    Читает сообщения вида:
      [4 байта длины big-endian][payload ...]
    поверх буферизованного потока.
    """
    def __init__(self, base: io.BufferedReader):
        if not base.readable():
            raise ValueError("base must be readable")
        self._base = base

    def readable(self):
        return True

    def read_frame(self) -> bytes:
        # читаем заголовок из 4 байт
        hdr = self._base.read(4)
        if not hdr:
            return b""
        if len(hdr) < 4:
            raise EOFError("incomplete frame header")
        length = int.from_bytes(hdr, "big")
        # теперь читаем payload нужной длины
        data = self._base.read(length)
        if len(data) < length:
            raise EOFError("incomplete frame body")
        return data

    # остальные методы можно не реализовывать, если используем только read_frame()

Смешивание с TextIOWrapper (типовой конвейер)

Типовой паттерн использования (даже без наследования):

import io

raw = io.FileIO("file.txt", "rb")         # RawIOBase
buf = io.BufferedReader(raw)              # BufferedIOBase
text = io.TextIOWrapper(buf, encoding="utf-8")  # TextIOBase

for line in text:
    ...

Если вы написали свой BufferedIOBase (фильтр, логгер, лимитер и т.п.), дальше он спокойно используется как нижний слой для TextIOWrapper, главное:

  • он должен быть readable() или writable();
  • корректно реализовать read/write/readinto/flush.
raw = io.FileIO("file.txt", "rb")
buf = io.BufferedReader(raw)
cnt = CountingBuffered(buf)
text = io.TextIOWrapper(cnt, encoding="utf-8")

text.read()
print(cnt.bytes_read)

Когда именно имеет смысл наследоваться от BufferedIOBase

Имеет смысл, если:

  • у вас уже есть RawIOBase, но вы хотите:

    • нестандартную буферизацию (особые read1, peek, line buffering);
    • фильтровать/инспектировать данные именно на уровне байтов, но уже с буфером;
  • хотите свой "умный" буферизованный слой между RawIOBase и TextIOWrapper.

Чаще НЕ нужно наследоваться, а достаточно:

  • использовать стандартные BufferedReader/BufferedWriter/BufferedRandom;
  • сделать декоратор, который просто оборачивает эти стандартные классы и переопределяет только нужные методы (read, write, readline, flush).

Пример socket => RawIOBase => BufferedReader/Writer

Главная идея: сокет => сырой поток (RawIOBase) => буферизованный (BufferedIOBase) => при желании текст (TextIOWrapper).

Сначала делаем простой RawIOBase над сокетом (чтобы с ним могли работать все I/O-классы):

import io
import socket

class SocketRaw(io.RawIOBase):
    def __init__(self, sock: socket.socket):
        self.sock = sock
        self._closed = False

    def readable(self):
        return True

    def writable(self):
        return True

    def readinto(self, b):
        if self._closed:
            return 0
        # recv_into читает прямо в буфер b, zero-copy
        n = self.sock.recv_into(b)
        return n  # 0 = EOF

    def write(self, b):
        if self._closed:
            raise ValueError("I/O operation on closed socket")
        # send может отправить не всё, поэтому цикл
        mv = memoryview(b)
        total = 0
        while total < len(mv):
            sent = self.sock.send(mv[total:])
            if sent == 0:
                raise RuntimeError("socket connection broken")
            total += sent
        return total

    def close(self):
        if not self._closed:
            self._closed = True
            try:
                self.sock.close()
            finally:
                super().close()

    @property
    def closed(self):
        return self._closed

Теперь - буферизованный уровень (BufferedIOBase-наследники из stdlib):

raw = SocketRaw(sock)

## отдельные буферы на чтение и запись
buf_r = io.BufferedReader(raw)
buf_w = io.BufferedWriter(raw)

## или двунаправленный буфер
buf = io.BufferedRWPair(buf_r, buf_w)

Паттерн 1: всё общение через BufferedReader/Writer, а не напрямую через send/recv.

Линейный текстовый протокол (SMTP, HTTP, свой текстовый)

Частый случай: поверх TCP-стрима идёт текст построчно.Паттерн: SocketRaw => BufferedRWPair => TextIOWrapper.

raw = SocketRaw(sock)
buf_r = io.BufferedReader(raw)
buf_w = io.BufferedWriter(raw)
buf_rw = io.BufferedRWPair(buf_r, buf_w)

text = io.TextIOWrapper(
    buf_rw,
    encoding="utf-8",
    newline="\n",      # как хотим нормализовать
    line_buffering=True,  # flush по '\n'
)

## запись
text.write("HELLO server\n")
text.flush()  # на всякий случай

## чтение
line = text.readline()
print("ответ:", line)

Здесь BufferedRWPair - как раз наследник BufferedIOBase, а TextIOWrapper сверху даёт удобно работать со строками.

Фреймированный бинарный протокол поверх буферизованного сокета

Типичный binary-протокол: [4 байта длины][payload].Здесь удобно писать свой класс, который композицией использует BufferedReader/Writer (они уже BufferedIOBase).

class FramedSocket:
    """
    Поверх буферизованного бинарного стрима:
      send_frame(data: bytes)
      recv_frame() -> bytes
    """
    def __init__(self, buf_r: io.BufferedReader, buf_w: io.BufferedWriter):
        self._r = buf_r
        self._w = buf_w

    def send_frame(self, data: bytes):
        length = len(data).to_bytes(4, "big")
        self._w.write(length)
        self._w.write(data)
        self._w.flush()

    def recv_frame(self) -> bytes:
        hdr = self._r.read(4)
        if not hdr:
            return b""
        if len(hdr) < 4:
            raise EOFError("incomplete frame header")
        length = int.from_bytes(hdr, "big")
        payload = self._r.read(length)
        if len(payload) < length:
            raise EOFError("incomplete frame body")
        return payload

Создание:

raw = SocketRaw(sock)
buf_r = io.BufferedReader(raw)
buf_w = io.BufferedWriter(raw)

framed = FramedSocket(buf_r, buf_w)

framed.send_frame(b"hello")
resp = framed.recv_frame()

Паттерн 2: протокол как отдельный класс, использующий BufferedIOBase-слой, а не сам буферизующий сокет.

Логирующий буфер над сокетом (декоратор на BufferedIOBase)

Иногда хочется посмотреть, что реально летит по сети.Паттерн: свой класс-наследник BufferedIOBase, который оборачивает существующий BufferedReader/Writer и логирует read/write.

class LoggingBuffered(io.BufferedIOBase):
    def __init__(self, base: io.BufferedIOBase, name: str = ""):
        self._base = base
        self._name = name or repr(base)

    def readable(self):
        return self._base.readable()

    def writable(self):
        return self._base.writable()

    def read(self, size=-1):
        data = self._base.read(size)
        if data:
            print(f"[{self._name} READ {len(data)}]: {data!r}")
        return data

    def read1(self, size=-1):
        if hasattr(self._base, "read1"):
            data = self._base.read1(size)
        else:
            data = self._base.read(size)
        if data:
            print(f"[{self._name} READ1 {len(data)}]: {data!r}")
        return data

    def write(self, b):
        print(f"[{self._name} WRITE {len(b)}]: {b!r}")
        return self._base.write(b)

    def flush(self):
        self._base.flush()

    def close(self):
        self._base.close()
        super().close()

Использование:

raw = SocketRaw(sock)
buf_r = io.BufferedReader(raw)
buf_w = io.BufferedWriter(raw)

log_r = LoggingBuffered(buf_r, "sock-r")
log_w = LoggingBuffered(buf_w, "sock-w")

## дальше используем log_r/log_w в протоколе
framed = FramedSocket(log_r, log_w)

Важные нюансы с сокетами + буферы

Кратко, но важное:

  1. Не мешать буферы и sock.recv/sock.send: если обернули сокет в BufferedReader/Writer / TextIOWrapper, дальше все операции делайте только через этот стек, иначе desync.

  2. Таймаут ставится на сам сокет:

    sock.settimeout(5.0)
    

    BufferedReader.read() при этом будет падать по socket.timeout.

  3. Полудуплекс / half-close

    • закрыть только запись: sock.shutdown(socket.SHUT_WR) (или закрыть входной BufferedWriter);
    • чтение при этом можно оставить пока открытым.
  4. SSL: для ssl.SSLSocket паттерн тот же: оборачиваешь его в SocketRaw (или используешь SSLSocket.makefile() - там под капотом тоже I/O-стек).