io.RawIOBase - это низкоуровневый бинарный поток. Обычно вы не работаете с ним напрямую (а через BufferedReader/Writer и TextIOWrapper), но он очень полезен, когда нужно:
Ниже, основные паттерны использования io.RawIOBase() в Python..
RawIOBase и реализуем минимальный наборОбычно достаточно реализовать:
readinto(self, b: bytearray | memoryview) -> int | Nonewrite(self, b: bytes | bytearray | memoryview) -> intreadable() / writable() / seekable(), close() и tell()/seek() при необходимостиRawIOBase сам реализует часть методов (read, readall, readline) через readinto.
import io class BufferReader(io.RawIOBase): def __init__(self, data: bytes): self._data = data self._pos = 0 self._closed = False def readable(self): return True def readinto(self, b): if self._closed: return 0 if self._pos >= len(self._data): return 0 # EOF # b - это буфер (bytearray/memoryview), в который нужно записать данные max_len = len(b) n = min(max_len, len(self._data) - self._pos) b[:n] = self._data[self._pos:self._pos + n] self._pos += n return n def close(self): self._closed = True super().close()
Использование напрямую:
raw = BufferReader(b"hello world") buf = bytearray(4) print(raw.readinto(buf)) # 4 print(buf) # b'hell'
Часто вы не хотите сами реализовывать буферизацию, readline(), большие read(), и т.п.Правильный паттерн:
RawIOBase, реализуете только "сырые" операции;io.BufferedReader/BufferedWriter/BufferedRWPair;io.TextIOWrapper для строк.raw = BufferReader(b"hello\nworld\n") buf = io.BufferedReader(raw) # буферизованный бинарный text = io.TextIOWrapper(buf, encoding="utf-8") print(text.readline()) # "hello\n" print(text.readline()) # "world\n"
Паттерн: свой RawIOBase => BufferedXxx => TextIOWrapper.
IOBaseНапример, у вас есть объект, который умеет recv/send (сокет, библиотечный объект), а вы хотите подключить его к инфраструктуре Python I/O.
recv/sendclass SocketRaw(io.RawIOBase): def __init__(self, sock): self.sock = sock self._closed = False def readable(self): return True def writable(self): return True def readinto(self, b): if self._closed: return 0 # читаем не больше длины буфера n = self.sock.recv_into(b) return n # 0 - EOF def write(self, b): if self._closed: raise ValueError("I/O operation on closed stream") return self.sock.send(b) def close(self): if not self._closed: self.sock.close() self._closed = True super().close()
Теперь можно делать так:
raw = SocketRaw(sock) buf = io.BufferedRWPair(raw, raw) text = io.TextIOWrapper(buf, encoding="utf-8") text.write("GET / HTTP/1.0\r\n\r\n") text.flush() print(text.read())
То есть вы подключили "левый" сокет к стандартной файловой экосистеме.
Классика: нужно прочитать не больше N байт, даже если базовый поток отдаст больше (например, тело HTTP-запроса с фиксированным Content-Length).
class LimitedRawReader(io.RawIOBase): def __init__(self, base: io.RawIOBase, limit: int): self._base = base self._remaining = limit def readable(self): return True def readinto(self, b): if self._remaining <= 0: return 0 # EOF # ограничиваем размер чтения max_len = min(len(b), self._remaining) if max_len <= 0: return 0 # делаем временный slice, чтобы base не записал больше mv = memoryview(b)[:max_len] n = self._base.readinto(mv) if n is None: return None self._remaining -= n return n
Использование:
raw = BufferReader(b"1234567890") limited = LimitedRawReader(raw, 4) buf = io.BufferedReader(limited) print(buf.read()) # b'1234'
Ещё один популярный паттерн - "side effect": при чтении/записи обновлять хэш, счётчик, лог и т.п.
import hashlib class HashingRawReader(io.RawIOBase): def __init__(self, base: io.RawIOBase, algo="sha256"): self._base = base self._hash = hashlib.new(algo) def readable(self): return True def readinto(self, b): n = self._base.readinto(b) if n and n > 0: self._hash.update(b[:n]) return n @property def hexdigest(self): return self._hash.hexdigest()
Использование:
raw = BufferReader(b"abcdef") hraw = HashingRawReader(raw) buf = io.BufferedReader(hraw) data = buf.read() print(data) # b'abcdef' print(hraw.hexdigest)
readinto и memoryviewОсновная фишка RawIOBase - работа через readinto, которая позволяет не выделять новые объекты bytes, а заполнять уже существующие буферы (bytearray, memoryview), что важно для производительности.
Паттерн:
buf = bytearray(8192) while True: n = raw.readinto(buf) if not n: break chunk = memoryview(buf)[:n] # обрабатываем chunk без копирования
Если вы пишете свой RawIOBase, старайтесь максимально использовать readinto внутри, а не создавать новые bytes лишний раз.
Если источник поддерживает произвольный доступ (файл на диске, mmap, свой массив), можно реализовать seek/tell и seekable().
class ArrayRaw(io.RawIOBase): def __init__(self, data: bytearray): self._data = data self._pos = 0 def readable(self): return True def writable(self): return True def seekable(self): return True def readinto(self, b): if self._pos >= len(self._data): return 0 n = min(len(b), len(self._data) - self._pos) b[:n] = self._data[self._pos:self._pos + n] self._pos += n return n def write(self, b): end = self._pos + len(b) if end > len(self._data): self._data.extend(b[len(self._data) - self._pos:]) self._data[self._pos:end] = b self._pos = end return len(b) def seek(self, offset, whence=0): if whence == 0: self._pos = offset elif whence == 1: self._pos += offset elif whence == 2: self._pos = len(self._data) + offset return self._pos def tell(self): return self._pos
Когда у вас есть библиотека, которая ожидает RawIOBase, вы можете делать фейки:
readinto после write => ошибка)Это расширение того же паттерна, что и с IOBase, только на уровне байтов.
RawIOBase, а когда нетИспользуйте RawIOBase, если:
readinto/zero-copy и контроль над буферамиBufferedReader/Writer/TextIOWrapper поверхНе используйте RawIOBase, если:
TextIOBaseopen() с нужными параметрамиBufferedIOBaseio.RawIOBase - это низкоуровневый бинарный поток. Обычно вы не работаете с ним напрямую (а через BufferedReader/Writer и TextIOWrapper), но он очень полезен, когда нужно:
Ниже - основные паттерны.
RawIOBase и реализуем минимальный наборОбычно достаточно реализовать:
readinto(self, b: bytearray | memoryview) -> int | Nonewrite(self, b: bytes | bytearray | memoryview) -> intreadable() / writable() / seekable(), close() и tell()/seek() при необходимостиRawIOBase сам реализует часть методов (read, readall, readline) через readinto.
import io class BufferReader(io.RawIOBase): def __init__(self, data: bytes): self._data = data self._pos = 0 self._closed = False def readable(self): return True def readinto(self, b): if self._closed: return 0 if self._pos >= len(self._data): return 0 # EOF # b - это буфер (bytearray/memoryview), в который нужно записать данные max_len = len(b) n = min(max_len, len(self._data) - self._pos) b[:n] = self._data[self._pos:self._pos + n] self._pos += n return n def close(self): self._closed = True super().close()
Использование напрямую:
raw = BufferReader(b"hello world") buf = bytearray(4) print(raw.readinto(buf)) # 4 print(buf) # b'hell'
Часто вы не хотите сами реализовывать буферизацию, readline(), большие read(), и т.п.Правильный паттерн:
RawIOBase, реализуете только "сырые" операции;io.BufferedReader/BufferedWriter/BufferedRWPair;io.TextIOWrapper для строк.raw = BufferReader(b"hello\nworld\n") buf = io.BufferedReader(raw) # буферизованный бинарный text = io.TextIOWrapper(buf, encoding="utf-8") print(text.readline()) # "hello\n" print(text.readline()) # "world\n"
Паттерн: свой RawIOBase => BufferedXxx => TextIOWrapper.
IOBaseНапример, у вас есть объект, который умеет recv/send (сокет, библиотечный объект), а вы хотите подключить его к инфраструктуре Python I/O.
recv/sendclass SocketRaw(io.RawIOBase): def __init__(self, sock): self.sock = sock self._closed = False def readable(self): return True def writable(self): return True def readinto(self, b): if self._closed: return 0 # читаем не больше длины буфера n = self.sock.recv_into(b) return n # 0 - EOF def write(self, b): if self._closed: raise ValueError("I/O operation on closed stream") return self.sock.send(b) def close(self): if not self._closed: self.sock.close() self._closed = True super().close()
Теперь можно делать так:
raw = SocketRaw(sock) buf = io.BufferedRWPair(raw, raw) text = io.TextIOWrapper(buf, encoding="utf-8") text.write("GET / HTTP/1.0\r\n\r\n") text.flush() print(text.read())
То есть вы подключили "левый" сокет к стандартной файловой экосистеме.
Классика: нужно прочитать не больше N байт, даже если базовый поток отдаст больше (например, тело HTTP-запроса с фиксированным Content-Length).
class LimitedRawReader(io.RawIOBase): def __init__(self, base: io.RawIOBase, limit: int): self._base = base self._remaining = limit def readable(self): return True def readinto(self, b): if self._remaining <= 0: return 0 # EOF # ограничиваем размер чтения max_len = min(len(b), self._remaining) if max_len <= 0: return 0 # делаем временный slice, чтобы base не записал больше mv = memoryview(b)[:max_len] n = self._base.readinto(mv) if n is None: return None self._remaining -= n return n
Использование:
raw = BufferReader(b"1234567890") limited = LimitedRawReader(raw, 4) buf = io.BufferedReader(limited) print(buf.read()) # b'1234'
Ещё один популярный паттерн - "side effect": при чтении/записи обновлять хэш, счётчик, лог и т.п.
import hashlib class HashingRawReader(io.RawIOBase): def __init__(self, base: io.RawIOBase, algo="sha256"): self._base = base self._hash = hashlib.new(algo) def readable(self): return True def readinto(self, b): n = self._base.readinto(b) if n and n > 0: self._hash.update(b[:n]) return n @property def hexdigest(self): return self._hash.hexdigest()
Использование:
raw = BufferReader(b"abcdef") hraw = HashingRawReader(raw) buf = io.BufferedReader(hraw) data = buf.read() print(data) # b'abcdef' print(hraw.hexdigest)
readinto и memoryviewОсновная фишка RawIOBase - работа через readinto, которая позволяет не выделять новые объекты bytes, а заполнять уже существующие буферы (bytearray, memoryview), что важно для производительности.
Паттерн:
buf = bytearray(8192) while True: n = raw.readinto(buf) if not n: break chunk = memoryview(buf)[:n] # обрабатываем chunk без копирования
Если вы пишете свой RawIOBase, старайтесь максимально использовать readinto внутри, а не создавать новые bytes лишний раз.
Если источник поддерживает произвольный доступ (файл на диске, mmap, свой массив), можно реализовать seek/tell и seekable().
class ArrayRaw(io.RawIOBase): def __init__(self, data: bytearray): self._data = data self._pos = 0 def readable(self): return True def writable(self): return True def seekable(self): return True def readinto(self, b): if self._pos >= len(self._data): return 0 n = min(len(b), len(self._data) - self._pos) b[:n] = self._data[self._pos:self._pos + n] self._pos += n return n def write(self, b): end = self._pos + len(b) if end > len(self._data): self._data.extend(b[len(self._data) - self._pos:]) self._data[self._pos:end] = b self._pos = end return len(b) def seek(self, offset, whence=0): if whence == 0: self._pos = offset elif whence == 1: self._pos += offset elif whence == 2: self._pos = len(self._data) + offset return self._pos def tell(self): return self._pos
Когда у вас есть библиотека, которая ожидает RawIOBase, вы можете делать фейки:
readinto после write => ошибка)Это расширение того же паттерна, что и с IOBase, только на уровне байтов.
RawIOBase, а когда нетИспользуйте RawIOBase, если:
readinto/zero-copy и контроль над буферамиBufferedReader/Writer/TextIOWrapper поверхНе используйте RawIOBase, если:
TextIOBaseopen() с нужными параметрамиBufferedIOBaseRawIOBaseИдея:
key: bytes.key по кругу).cipher = plain ^ keystream.plain = cipher ^ keystream (тот же поток ключа).self._pos, чтобы поток ключа совпадал чтение/запись.import io class EncryptingRawStream(io.RawIOBase): """ Сырой поток, который: - при write() шифрует данные XOR'ом и пишет в базовый поток; - при readinto() читает из базового и дешифрует. ВНИМАНИЕ: это НЕ безопасная криптография, только пример паттерна. """ def __init__(self, base: io.RawIOBase, key: bytes): if not (base.readable() or base.writable()): raise ValueError("base must be readable or writable") if not key: raise ValueError("key must be non-empty") self._base = base self._key = key self._pos = 0 # общий offset для потока ключа self._closed = False # --- служебные методы --- def _xor_with_keystream(self, data: bytes) -> bytes: """XOR data с циклическим ключом начиная с self._pos.""" key = self._key klen = len(key) res = bytearray(len(data)) pos = self._pos for i, b in enumerate(data): res[i] = b ^ key[pos % klen] pos += 1 self._pos = pos return bytes(res) # --- RawIOBase API --- def readable(self): return self._base.readable() def writable(self): return self._base.writable() def readinto(self, b): """ Читаем зашифрованные байты из base.readinto(b), дешифруем их на месте и возвращаем количество байт. """ if not self.readable(): return 0 if self._closed: return 0 # читаем в буфер b "как есть" n = self._base.readinto(b) if not n: return n mv = memoryview(b)[:n] # дешифруем данные "в лоб": XOR поверх текущего содержимого decrypted = self._xor_with_keystream(bytes(mv)) mv[:] = decrypted return n def write(self, b): """ Шифруем b и пишем в базовый поток. """ if not self.writable(): raise io.UnsupportedOperation("not writable") if self._closed: raise ValueError("I/O operation on closed stream") if not isinstance(b, (bytes, bytearray, memoryview)): raise TypeError("bytes-like object required, not %r" % type(b)) enc = self._xor_with_keystream(bytes(b)) return self._base.write(enc) def flush(self): if hasattr(self._base, "flush"): self._base.flush() def close(self): if not self._closed: self._closed = True try: self._base.close() finally: super().close() @property def closed(self): return self._closed
Запись зашифрованных данных:
import io key = b"supersecret" ## Открываем файл как сырое устройство with open("secret.bin", "wb", buffering=0) as f_raw: # f_raw.raw – это RawIOBase под капотом (FileIO), но обычно # open(..., buffering=0) уже даёт "сырое" поведение enc_stream = EncryptingRawStream(f_raw, key) enc_stream.write(b"hello world") enc_stream.flush()
Чтение и расшифровка:
with open("secret.bin", "rb", buffering=0) as f_raw: dec_stream = EncryptingRawStream(f_raw, key) buf = io.BufferedReader(dec_stream) # для удобства data = buf.read() print(data) # b'hello world'
В реальной жизни вместо XOR нужно использовать нормальную библиотеку (например,
cryptography), аEncryptingRawStreamиспользовать как "драйвер", который внутрь себя прячет шифратор.
subprocess (stdin / stdout)Типичный паттерн:
Создаём subprocess.Popen(..., stdin=PIPE, stdout=PIPE).
Получаем бинарные потоки proc.stdin и proc.stdout.
Делаем над ними тонкие обёртки RawIOBase, чтобы:
BufferedReader, TextIOWrapper, шифрование и т.д.import io import subprocess class ProcessStdoutRaw(io.RawIOBase): def __init__(self, proc: subprocess.Popen): if proc.stdout is None: raise ValueError("process has no stdout") self._proc = proc self._stdout = proc.stdout self._closed = False def readable(self): return True def writable(self): return False def readinto(self, b): if self._closed: return 0 # stdout у Popen обычно уже буферизованный поток, у него есть readinto n = self._stdout.readinto(b) if n is None: # некоторые реализации могут вернуть None => трактуем как 0 return 0 return n def close(self): if not self._closed: self._closed = True try: if self._stdout: self._stdout.close() finally: super().close() @property def closed(self): return self._closed
class ProcessStdinRaw(io.RawIOBase): def __init__(self, proc: subprocess.Popen): if proc.stdin is None: raise ValueError("process has no stdin") self._proc = proc self._stdin = proc.stdin self._closed = False def readable(self): return False def writable(self): return True def write(self, b): if self._closed: raise ValueError("I/O operation on closed stream") if not isinstance(b, (bytes, bytearray, memoryview)): raise TypeError("bytes-like object required") return self._stdin.write(b) def flush(self): if hasattr(self._stdin, "flush"): self._stdin.flush() def close(self): if not self._closed: self._closed = True try: if self._stdin: self._stdin.close() finally: super().close() @property def closed(self): return self._closed
cat через RawIOBaseimport io import subprocess ## Запускаем простой эхо-процесс (cat на *nix; на Windows можно использовать 'python -u -c "import sys; sys.stdout.write(sys.stdin.read())"') proc = subprocess.Popen( ["cat"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) raw_in = ProcessStdinRaw(proc) # raw-поток на stdin процесса raw_out = ProcessStdoutRaw(proc) # raw-поток на stdout процесса ## Оборачиваем в буферы для удобства buf_in = io.BufferedWriter(raw_in) buf_out = io.BufferedReader(raw_out) ## Пишем buf_in.write(b"hello subprocess\n") buf_in.flush() ## Важно: не закрывать stdin, если процесс должен ещё работать ## Читаем ответ line = buf_out.readline() print(line) # b'hello subprocess\n' ## Закрытие raw_in.close() raw_out.close() proc.terminate()
Можно всё это ещё и скрестить:
ProcessStdinRaw/ProcessStdoutRaw;EncryptingRawStream;BufferedReader/Writer или TextIOWrapper.Типовая цепочка, например, для шифруемой записи в stdin процесса:
## stdin процесса -> EncryptingRawStream -> BufferedWriter proc = subprocess.Popen([...], stdin=subprocess.PIPE) raw_in = ProcessStdinRaw(proc) enc_in = EncryptingRawStream(raw_in, key=b"secret") buf_in = io.BufferedWriter(enc_in) buf_in.write(b"hello\n") # отправится в процесс уже зашифрованным buf_in.flush()