io.BufferedIOBase - это базовый класс для буферизованных бинарных потоков. Обычно вы используете его наследников (BufferedReader, BufferedWriter, BufferedRandom), но можно и наследоваться от него, если хотите:
RawIOBase;Ниже, основные паттерны использования io.BufferedIOBase() в Python.
BufferedIOBase стоит в иерархииТиповая цепочка:
RawIOBase (низкоуровневый байтовый драйвер)
↓
BufferedIOBase (буферизация, read(), readline(), write())
↓
TextIOBase (TextIOWrapper - str-слой)
Т.е. ваш RawIOBase => ваш или стандартный BufferedIOBase => TextIOWrapper (если нужен текст).
Паттерн:
raw: RawIOBase.bytearray()).read, read1, readinto, write, flush, readable, writable.Минималистичный только читающий буфер:
import io class SimpleBufferedReader(io.BufferedIOBase): def __init__(self, raw: io.RawIOBase, buffer_size: int = 8192): if not raw.readable(): raise ValueError("raw stream must be readable") self._raw = raw self._buffer = bytearray() self._buffer_size = buffer_size self._closed = False # --- служебный метод --- def _fill_buffer(self, min_bytes=1): """Дочитываем в буфер из raw, пока там не будет хотя бы min_bytes (или пока не достигнем EOF).""" while len(self._buffer) < min_bytes: chunk = self._raw.read(self._buffer_size) if not chunk: break self._buffer.extend(chunk) # --- интерфейс BufferedIOBase --- def readable(self): return True def read(self, size=-1): if self._closed: raise ValueError("I/O operation on closed file") if size is None or size < 0: # читаем всё до EOF chunks = [bytes(self._buffer)] self._buffer.clear() while True: chunk = self._raw.read(self._buffer_size) if not chunk: break chunks.append(chunk) return b"".join(chunks) # читаем size байт self._fill_buffer(size) data = self._buffer[:size] del self._buffer[:size] return bytes(data) def read1(self, size=-1): """read1 обычно читает максимум один "пакет" из нижележащего raw.""" if self._closed: raise ValueError("closed") if size is None or size < 0: size = self._buffer_size if not self._buffer: chunk = self._raw.read(size) if not chunk: return b"" return chunk else: data = self._buffer[:size] del self._buffer[:size] return bytes(data) def close(self): if not self._closed: self._closed = True try: self._raw.close() finally: super().close() @property def closed(self): return self._closed
Использование:
raw = io.FileIO("some.bin", "rb") # RawIOBase buf = SimpleBufferedReader(raw) data = buf.read(100) # читаем через наш буфер
Частый паттерн: декоратор на BufferedIOBase, который:
CountingBuffered - считает байты на чтении/записиclass CountingBuffered(io.BufferedIOBase): def __init__(self, base: io.BufferedIOBase): if not (base.readable() or base.writable()): raise ValueError("base must be readable or writable") self._base = base self.bytes_read = 0 self.bytes_written = 0 def readable(self): return self._base.readable() def writable(self): return self._base.writable() # --- чтение --- def read(self, size=-1): data = self._base.read(size) if data is not None: self.bytes_read += len(data) return data def read1(self, size=-1): if hasattr(self._base, "read1"): data = self._base.read1(size) else: data = self._base.read(size) if data is not None: self.bytes_read += len(data) return data # --- запись --- def write(self, b): n = self._base.write(b) if n: self.bytes_written += n return n def flush(self): self._base.flush() def close(self): self._base.close() super().close()
Использование:
raw = io.FileIO("log.bin", "wb") buf = io.BufferedWriter(raw) cnt = CountingBuffered(buf) cnt.write(b"hello") cnt.write(b"world") cnt.flush() print(cnt.bytes_written) # 10
Похожий на пример с RawIOBase, но уже на уровне буферизованного API (read, write).
class LimitedBufferedReader(io.BufferedIOBase): def __init__(self, base: io.BufferedIOBase, limit: int): if not base.readable(): raise ValueError("base must be readable") self._base = base self._remaining = limit def readable(self): return True def read(self, size=-1): if self._remaining <= 0: return b"" if size is None or size < 0 or size > self._remaining: size = self._remaining data = self._base.read(size) if not data: return data self._remaining -= len(data) return data def read1(self, size=-1): # просто делегируем на read() с тем же ограничением return self.read(size)
Использование:
f = open("bigfile.bin", "rb") buf = io.BufferedReader(f) limited = LimitedBufferedReader(buf, limit=1024) chunk = limited.read() # максимум 1024 байта
readline)Буферизованные потоки часто используются для построчного чтения (протоколы, логи).
Можно сделать фильтр строк, например, который:
class TruncatingLineReader(io.BufferedIOBase): def __init__(self, base: io.BufferedIOBase, max_line_len: int): if not base.readable(): raise ValueError("base must be readable") self._base = base self._max_line_len = max_line_len def readable(self): return True def readline(self, size=-1): line = self._base.readline(size) if len(line) > self._max_line_len: return line[:self._max_line_len] + b"...\n" return line def read(self, size=-1): # Можно просто делегировать, если фильтрация только в readline return self._base.read(size)
Использование:
f = open("log.txt", "rb") buf = io.BufferedReader(f) trunc = TruncatingLineReader(buf, max_line_len=80) for _ in range(5): print(trunc.readline())
peek / read1 для протоколовBufferedReader (и его наследники) умеют peek(size) и read1(size):
peek - можно заглянуть в данные, не "вынимая" их из буфера;read1 - прочитать один "кусок" (обычно одно чтение из нижележащего потока).Паттерн: протокол с заголовком фиксированного размера.
class FramedReader(io.BufferedIOBase): """ Читает сообщения вида: [4 байта длины big-endian][payload ...] поверх буферизованного потока. """ def __init__(self, base: io.BufferedReader): if not base.readable(): raise ValueError("base must be readable") self._base = base def readable(self): return True def read_frame(self) -> bytes: # читаем заголовок из 4 байт hdr = self._base.read(4) if not hdr: return b"" if len(hdr) < 4: raise EOFError("incomplete frame header") length = int.from_bytes(hdr, "big") # теперь читаем payload нужной длины data = self._base.read(length) if len(data) < length: raise EOFError("incomplete frame body") return data # остальные методы можно не реализовывать, если используем только read_frame()
TextIOWrapper (типовой конвейер)Типовой паттерн использования (даже без наследования):
import io raw = io.FileIO("file.txt", "rb") # RawIOBase buf = io.BufferedReader(raw) # BufferedIOBase text = io.TextIOWrapper(buf, encoding="utf-8") # TextIOBase for line in text: ...
Если вы написали свой BufferedIOBase (фильтр, логгер, лимитер и т.п.), дальше он спокойно используется как нижний слой для TextIOWrapper, главное:
readable() или writable();read/write/readinto/flush.raw = io.FileIO("file.txt", "rb") buf = io.BufferedReader(raw) cnt = CountingBuffered(buf) text = io.TextIOWrapper(cnt, encoding="utf-8") text.read() print(cnt.bytes_read)
BufferedIOBaseИмеет смысл, если:
у вас уже есть RawIOBase, но вы хотите:
read1, peek, line buffering);хотите свой "умный" буферизованный слой между RawIOBase и TextIOWrapper.
Чаще НЕ нужно наследоваться, а достаточно:
BufferedReader/BufferedWriter/BufferedRandom;read, write, readline, flush).socket => RawIOBase => BufferedReader/WriterГлавная идея: сокет => сырой поток (
RawIOBase) => буферизованный (BufferedIOBase) => при желании текст (TextIOWrapper).
Сначала делаем простой RawIOBase над сокетом (чтобы с ним могли работать все I/O-классы):
import io import socket class SocketRaw(io.RawIOBase): def __init__(self, sock: socket.socket): self.sock = sock self._closed = False def readable(self): return True def writable(self): return True def readinto(self, b): if self._closed: return 0 # recv_into читает прямо в буфер b, zero-copy n = self.sock.recv_into(b) return n # 0 = EOF def write(self, b): if self._closed: raise ValueError("I/O operation on closed socket") # send может отправить не всё, поэтому цикл mv = memoryview(b) total = 0 while total < len(mv): sent = self.sock.send(mv[total:]) if sent == 0: raise RuntimeError("socket connection broken") total += sent return total def close(self): if not self._closed: self._closed = True try: self.sock.close() finally: super().close() @property def closed(self): return self._closed
Теперь - буферизованный уровень (BufferedIOBase-наследники из stdlib):
raw = SocketRaw(sock) ## отдельные буферы на чтение и запись buf_r = io.BufferedReader(raw) buf_w = io.BufferedWriter(raw) ## или двунаправленный буфер buf = io.BufferedRWPair(buf_r, buf_w)
BufferedReader/Writer, а не напрямую через send/recv.Частый случай: поверх TCP-стрима идёт текст построчно.Паттерн: SocketRaw => BufferedRWPair => TextIOWrapper.
raw = SocketRaw(sock) buf_r = io.BufferedReader(raw) buf_w = io.BufferedWriter(raw) buf_rw = io.BufferedRWPair(buf_r, buf_w) text = io.TextIOWrapper( buf_rw, encoding="utf-8", newline="\n", # как хотим нормализовать line_buffering=True, # flush по '\n' ) ## запись text.write("HELLO server\n") text.flush() # на всякий случай ## чтение line = text.readline() print("ответ:", line)
Здесь BufferedRWPair - как раз наследник BufferedIOBase, а TextIOWrapper сверху даёт удобно работать со строками.
Типичный binary-протокол: [4 байта длины][payload].Здесь удобно писать свой класс, который композицией использует BufferedReader/Writer (они уже BufferedIOBase).
class FramedSocket: """ Поверх буферизованного бинарного стрима: send_frame(data: bytes) recv_frame() -> bytes """ def __init__(self, buf_r: io.BufferedReader, buf_w: io.BufferedWriter): self._r = buf_r self._w = buf_w def send_frame(self, data: bytes): length = len(data).to_bytes(4, "big") self._w.write(length) self._w.write(data) self._w.flush() def recv_frame(self) -> bytes: hdr = self._r.read(4) if not hdr: return b"" if len(hdr) < 4: raise EOFError("incomplete frame header") length = int.from_bytes(hdr, "big") payload = self._r.read(length) if len(payload) < length: raise EOFError("incomplete frame body") return payload
Создание:
raw = SocketRaw(sock) buf_r = io.BufferedReader(raw) buf_w = io.BufferedWriter(raw) framed = FramedSocket(buf_r, buf_w) framed.send_frame(b"hello") resp = framed.recv_frame()
BufferedIOBase-слой, а не сам буферизующий сокет.Иногда хочется посмотреть, что реально летит по сети.Паттерн: свой класс-наследник BufferedIOBase, который оборачивает существующий BufferedReader/Writer и логирует read/write.
class LoggingBuffered(io.BufferedIOBase): def __init__(self, base: io.BufferedIOBase, name: str = ""): self._base = base self._name = name or repr(base) def readable(self): return self._base.readable() def writable(self): return self._base.writable() def read(self, size=-1): data = self._base.read(size) if data: print(f"[{self._name} READ {len(data)}]: {data!r}") return data def read1(self, size=-1): if hasattr(self._base, "read1"): data = self._base.read1(size) else: data = self._base.read(size) if data: print(f"[{self._name} READ1 {len(data)}]: {data!r}") return data def write(self, b): print(f"[{self._name} WRITE {len(b)}]: {b!r}") return self._base.write(b) def flush(self): self._base.flush() def close(self): self._base.close() super().close()
Использование:
raw = SocketRaw(sock) buf_r = io.BufferedReader(raw) buf_w = io.BufferedWriter(raw) log_r = LoggingBuffered(buf_r, "sock-r") log_w = LoggingBuffered(buf_w, "sock-w") ## дальше используем log_r/log_w в протоколе framed = FramedSocket(log_r, log_w)
Кратко, но важное:
Не мешать буферы и sock.recv/sock.send: если обернули сокет в BufferedReader/Writer / TextIOWrapper, дальше все операции делайте только через этот стек, иначе desync.
Таймаут ставится на сам сокет:
sock.settimeout(5.0)
BufferedReader.read() при этом будет падать по socket.timeout.
Полудуплекс / half-close
sock.shutdown(socket.SHUT_WR) (или закрыть входной BufferedWriter);SSL: для ssl.SSLSocket паттерн тот же: оборачиваешь его в SocketRaw (или используешь SSLSocket.makefile() - там под капотом тоже I/O-стек).