Сообщить об ошибке.

Паттерны использования io.RawIOBase() в Python

Содержание:

io.RawIOBase - это низкоуровневый бинарный поток. Обычно вы не работаете с ним напрямую (а через BufferedReader/Writer и TextIOWrapper), но он очень полезен, когда нужно:

  • обернуть свой источник/приёмник байт с минимальными накладными расходами
  • дать Python-овскому I/O-стеку "драйвер" к вашему устройству/протоколу
  • сделать что-то вроде драйвера: сокет, сериал, шифрование, лимитирование и т.д.

Ниже, основные паттерны использования io.RawIOBase() в Python..

Наследуемся от RawIOBase и реализуем минимальный набор

Обычно достаточно реализовать:

  • readinto(self, b: bytearray | memoryview) -> int | None
  • write(self, b: bytes | bytearray | memoryview) -> int
  • плюс readable() / writable() / seekable(), close() и tell()/seek() при необходимости

RawIOBase сам реализует часть методов (read, readall, readline) через readinto.

Пример: "сырой" источник, читающий из байтового буфера блоками

import io

class BufferReader(io.RawIOBase):
    def __init__(self, data: bytes):
        self._data = data
        self._pos = 0
        self._closed = False

    def readable(self):
        return True

    def readinto(self, b):
        if self._closed:
            return 0
        if self._pos >= len(self._data):
            return 0  # EOF

        # b - это буфер (bytearray/memoryview), в который нужно записать данные
        max_len = len(b)
        n = min(max_len, len(self._data) - self._pos)
        b[:n] = self._data[self._pos:self._pos + n]
        self._pos += n
        return n

    def close(self):
        self._closed = True
        super().close()

Использование напрямую:

raw = BufferReader(b"hello world")
buf = bytearray(4)
print(raw.readinto(buf))  # 4
print(buf)                # b'hell'

Паттерн "драйвер + буфер"

Часто вы не хотите сами реализовывать буферизацию, readline(), большие read(), и т.п.Правильный паттерн:

  1. наследуетесь от RawIOBase, реализуете только "сырые" операции;
  2. поверх него используете io.BufferedReader/BufferedWriter/BufferedRWPair;
  3. при необходимости поверх этого ещё и io.TextIOWrapper для строк.

Пример: "сырой" поток + буфер + текстовый

raw = BufferReader(b"hello\nworld\n")
buf = io.BufferedReader(raw)      # буферизованный бинарный
text = io.TextIOWrapper(buf, encoding="utf-8")

print(text.readline())  # "hello\n"
print(text.readline())  # "world\n"

Паттерн: свой RawIOBase => BufferedXxx => TextIOWrapper.

Обёртка-адаптер над чем-то, что не IOBase

Например, у вас есть объект, который умеет recv/send (сокет, библиотечный объект), а вы хотите подключить его к инфраструктуре Python I/O.

Пример: адаптер над объектом с методами recv/send

class SocketRaw(io.RawIOBase):
    def __init__(self, sock):
        self.sock = sock
        self._closed = False

    def readable(self):
        return True

    def writable(self):
        return True

    def readinto(self, b):
        if self._closed:
            return 0
        # читаем не больше длины буфера
        n = self.sock.recv_into(b)
        return n  # 0 - EOF

    def write(self, b):
        if self._closed:
            raise ValueError("I/O operation on closed stream")
        return self.sock.send(b)

    def close(self):
        if not self._closed:
            self.sock.close()
            self._closed = True
        super().close()

Теперь можно делать так:

raw = SocketRaw(sock)
buf = io.BufferedRWPair(raw, raw)
text = io.TextIOWrapper(buf, encoding="utf-8")
text.write("GET / HTTP/1.0\r\n\r\n")
text.flush()
print(text.read())

То есть вы подключили "левый" сокет к стандартной файловой экосистеме.

Поток-ограничитель (limit / cap)

Классика: нужно прочитать не больше N байт, даже если базовый поток отдаст больше (например, тело HTTP-запроса с фиксированным Content-Length).

class LimitedRawReader(io.RawIOBase):
    def __init__(self, base: io.RawIOBase, limit: int):
        self._base = base
        self._remaining = limit

    def readable(self):
        return True

    def readinto(self, b):
        if self._remaining <= 0:
            return 0  # EOF

        # ограничиваем размер чтения
        max_len = min(len(b), self._remaining)
        if max_len <= 0:
            return 0

        # делаем временный slice, чтобы base не записал больше
        mv = memoryview(b)[:max_len]
        n = self._base.readinto(mv)
        if n is None:
            return None
        self._remaining -= n
        return n

Использование:

raw = BufferReader(b"1234567890")
limited = LimitedRawReader(raw, 4)
buf = io.BufferedReader(limited)

print(buf.read())  # b'1234'

Поток, считающий хэш "на лету"

Ещё один популярный паттерн - "side effect": при чтении/записи обновлять хэш, счётчик, лог и т.п.

import hashlib

class HashingRawReader(io.RawIOBase):
    def __init__(self, base: io.RawIOBase, algo="sha256"):
        self._base = base
        self._hash = hashlib.new(algo)

    def readable(self):
        return True

    def readinto(self, b):
        n = self._base.readinto(b)
        if n and n > 0:
            self._hash.update(b[:n])
        return n

    @property
    def hexdigest(self):
        return self._hash.hexdigest()

Использование:

raw = BufferReader(b"abcdef")
hraw = HashingRawReader(raw)
buf = io.BufferedReader(hraw)
data = buf.read()
print(data)          # b'abcdef'
print(hraw.hexdigest)

Zero-copy / работа с readinto и memoryview

Основная фишка RawIOBase - работа через readinto, которая позволяет не выделять новые объекты bytes, а заполнять уже существующие буферы (bytearray, memoryview), что важно для производительности.

Паттерн:

buf = bytearray(8192)
while True:
    n = raw.readinto(buf)
    if not n:
        break
    chunk = memoryview(buf)[:n]
    # обрабатываем chunk без копирования

Если вы пишете свой RawIOBase, старайтесь максимально использовать readinto внутри, а не создавать новые bytes лишний раз.

Реализация seek/tell для потоков с произвольным доступом

Если источник поддерживает произвольный доступ (файл на диске, mmap, свой массив), можно реализовать seek/tell и seekable().

class ArrayRaw(io.RawIOBase):
    def __init__(self, data: bytearray):
        self._data = data
        self._pos = 0

    def readable(self): return True
    def writable(self): return True
    def seekable(self): return True

    def readinto(self, b):
        if self._pos >= len(self._data):
            return 0
        n = min(len(b), len(self._data) - self._pos)
        b[:n] = self._data[self._pos:self._pos + n]
        self._pos += n
        return n

    def write(self, b):
        end = self._pos + len(b)
        if end > len(self._data):
            self._data.extend(b[len(self._data) - self._pos:])
        self._data[self._pos:end] = b
        self._pos = end
        return len(b)

    def seek(self, offset, whence=0):
        if whence == 0:
            self._pos = offset
        elif whence == 1:
            self._pos += offset
        elif whence == 2:
            self._pos = len(self._data) + offset
        return self._pos

    def tell(self):
        return self._pos

Тестовые двойники (fake raw stream) для низкоуровневого кода

Когда у вас есть библиотека, которая ожидает RawIOBase, вы можете делать фейки:

  • "ломающийся" после N байт
  • с контролем порядка вызовов (readinto после write => ошибка)
  • с симуляцией задержек и т.п.

Это расширение того же паттерна, что и с IOBase, только на уровне байтов.

Когда выбирать RawIOBase, а когда нет

Используйте RawIOBase, если:

  • вы пишете драйвер или низкоуровневую обёртку над чем-то бинарным
  • вам важны readinto/zero-copy и контроль над буферами
  • вы хотите использовать BufferedReader/Writer/TextIOWrapper поверх

Не используйте RawIOBase, если:

  • вам нужен только текстовый поток => TextIOBase
  • вы работаете с обычным файлом => проще open() с нужными параметрами
  • вам хватает буферизованного API => наследуйтесь от BufferedIOBase

io.RawIOBase - это низкоуровневый бинарный поток. Обычно вы не работаете с ним напрямую (а через BufferedReader/Writer и TextIOWrapper), но он очень полезен, когда нужно:

  • обернуть свой источник/приёмник байт с минимальными накладными расходами
  • дать Python-овскому I/O-стеку "драйвер" к вашему устройству/протоколу
  • сделать что-то вроде драйвера: сокет, сериал, шифрование, лимитирование и т.д.

Ниже - основные паттерны.

Наследуемся от RawIOBase и реализуем минимальный набор

Обычно достаточно реализовать:

  • readinto(self, b: bytearray | memoryview) -> int | None
  • write(self, b: bytes | bytearray | memoryview) -> int
  • плюс readable() / writable() / seekable(), close() и tell()/seek() при необходимости

RawIOBase сам реализует часть методов (read, readall, readline) через readinto.

Пример: "сырой" источник, читающий из байтового буфера блоками

import io

class BufferReader(io.RawIOBase):
    def __init__(self, data: bytes):
        self._data = data
        self._pos = 0
        self._closed = False

    def readable(self):
        return True

    def readinto(self, b):
        if self._closed:
            return 0
        if self._pos >= len(self._data):
            return 0  # EOF

        # b - это буфер (bytearray/memoryview), в который нужно записать данные
        max_len = len(b)
        n = min(max_len, len(self._data) - self._pos)
        b[:n] = self._data[self._pos:self._pos + n]
        self._pos += n
        return n

    def close(self):
        self._closed = True
        super().close()

Использование напрямую:

raw = BufferReader(b"hello world")
buf = bytearray(4)
print(raw.readinto(buf))  # 4
print(buf)                # b'hell'

Паттерн "драйвер + буфер"

Часто вы не хотите сами реализовывать буферизацию, readline(), большие read(), и т.п.Правильный паттерн:

  1. наследуетесь от RawIOBase, реализуете только "сырые" операции;
  2. поверх него используете io.BufferedReader/BufferedWriter/BufferedRWPair;
  3. при необходимости поверх этого ещё и io.TextIOWrapper для строк.

Пример: "сырой" поток + буфер + текстовый

raw = BufferReader(b"hello\nworld\n")
buf = io.BufferedReader(raw)      # буферизованный бинарный
text = io.TextIOWrapper(buf, encoding="utf-8")

print(text.readline())  # "hello\n"
print(text.readline())  # "world\n"

Паттерн: свой RawIOBase => BufferedXxx => TextIOWrapper.

Обёртка-адаптер над чем-то, что не IOBase

Например, у вас есть объект, который умеет recv/send (сокет, библиотечный объект), а вы хотите подключить его к инфраструктуре Python I/O.

Пример: адаптер над объектом с методами recv/send

class SocketRaw(io.RawIOBase):
    def __init__(self, sock):
        self.sock = sock
        self._closed = False

    def readable(self):
        return True

    def writable(self):
        return True

    def readinto(self, b):
        if self._closed:
            return 0
        # читаем не больше длины буфера
        n = self.sock.recv_into(b)
        return n  # 0 - EOF

    def write(self, b):
        if self._closed:
            raise ValueError("I/O operation on closed stream")
        return self.sock.send(b)

    def close(self):
        if not self._closed:
            self.sock.close()
            self._closed = True
        super().close()

Теперь можно делать так:

raw = SocketRaw(sock)
buf = io.BufferedRWPair(raw, raw)
text = io.TextIOWrapper(buf, encoding="utf-8")
text.write("GET / HTTP/1.0\r\n\r\n")
text.flush()
print(text.read())

То есть вы подключили "левый" сокет к стандартной файловой экосистеме.

Поток-ограничитель (limit / cap)

Классика: нужно прочитать не больше N байт, даже если базовый поток отдаст больше (например, тело HTTP-запроса с фиксированным Content-Length).

class LimitedRawReader(io.RawIOBase):
    def __init__(self, base: io.RawIOBase, limit: int):
        self._base = base
        self._remaining = limit

    def readable(self):
        return True

    def readinto(self, b):
        if self._remaining <= 0:
            return 0  # EOF

        # ограничиваем размер чтения
        max_len = min(len(b), self._remaining)
        if max_len <= 0:
            return 0

        # делаем временный slice, чтобы base не записал больше
        mv = memoryview(b)[:max_len]
        n = self._base.readinto(mv)
        if n is None:
            return None
        self._remaining -= n
        return n

Использование:

raw = BufferReader(b"1234567890")
limited = LimitedRawReader(raw, 4)
buf = io.BufferedReader(limited)

print(buf.read())  # b'1234'

Поток, считающий хэш "на лету"

Ещё один популярный паттерн - "side effect": при чтении/записи обновлять хэш, счётчик, лог и т.п.

import hashlib

class HashingRawReader(io.RawIOBase):
    def __init__(self, base: io.RawIOBase, algo="sha256"):
        self._base = base
        self._hash = hashlib.new(algo)

    def readable(self):
        return True

    def readinto(self, b):
        n = self._base.readinto(b)
        if n and n > 0:
            self._hash.update(b[:n])
        return n

    @property
    def hexdigest(self):
        return self._hash.hexdigest()

Использование:

raw = BufferReader(b"abcdef")
hraw = HashingRawReader(raw)
buf = io.BufferedReader(hraw)
data = buf.read()
print(data)          # b'abcdef'
print(hraw.hexdigest)

Zero-copy / работа с readinto и memoryview

Основная фишка RawIOBase - работа через readinto, которая позволяет не выделять новые объекты bytes, а заполнять уже существующие буферы (bytearray, memoryview), что важно для производительности.

Паттерн:

buf = bytearray(8192)
while True:
    n = raw.readinto(buf)
    if not n:
        break
    chunk = memoryview(buf)[:n]
    # обрабатываем chunk без копирования

Если вы пишете свой RawIOBase, старайтесь максимально использовать readinto внутри, а не создавать новые bytes лишний раз.

Реализация seek/tell для потоков с произвольным доступом

Если источник поддерживает произвольный доступ (файл на диске, mmap, свой массив), можно реализовать seek/tell и seekable().

class ArrayRaw(io.RawIOBase):
    def __init__(self, data: bytearray):
        self._data = data
        self._pos = 0

    def readable(self): return True
    def writable(self): return True
    def seekable(self): return True

    def readinto(self, b):
        if self._pos >= len(self._data):
            return 0
        n = min(len(b), len(self._data) - self._pos)
        b[:n] = self._data[self._pos:self._pos + n]
        self._pos += n
        return n

    def write(self, b):
        end = self._pos + len(b)
        if end > len(self._data):
            self._data.extend(b[len(self._data) - self._pos:])
        self._data[self._pos:end] = b
        self._pos = end
        return len(b)

    def seek(self, offset, whence=0):
        if whence == 0:
            self._pos = offset
        elif whence == 1:
            self._pos += offset
        elif whence == 2:
            self._pos = len(self._data) + offset
        return self._pos

    def tell(self):
        return self._pos

Тестовые двойники (fake raw stream) для низкоуровневого кода

Когда у вас есть библиотека, которая ожидает RawIOBase, вы можете делать фейки:

  • "ломающийся" после N байт
  • с контролем порядка вызовов (readinto после write => ошибка)
  • с симуляцией задержек и т.п.

Это расширение того же паттерна, что и с IOBase, только на уровне байтов.

Когда выбирать RawIOBase, а когда нет

Используйте RawIOBase, если:

  • вы пишете драйвер или низкоуровневую обёртку над чем-то бинарным
  • вам важны readinto/zero-copy и контроль над буферами
  • вы хотите использовать BufferedReader/Writer/TextIOWrapper поверх

Не используйте RawIOBase, если:

  • вам нужен только текстовый поток => TextIOBase
  • вы работаете с обычным файлом => проще open() с нужными параметрами
  • вам хватает буферизованного API => наследуйтесь от BufferedIOBase

Пример EncryptingRawStream: шифрующий/дешифрующий RawIOBase

Идея:

  • Есть ключ key: bytes.
  • Для каждого байта генерируем псевдопоток ключа (повторяем key по кругу).
  • При записи: cipher = plain ^ keystream.
  • При чтении: plain = cipher ^ keystream (тот же поток ключа).
  • Держим позицию self._pos, чтобы поток ключа совпадал чтение/запись.
import io

class EncryptingRawStream(io.RawIOBase):
    """
    Сырой поток, который:
      - при write() шифрует данные XOR'ом и пишет в базовый поток;
      - при readinto() читает из базового и дешифрует.
    ВНИМАНИЕ: это НЕ безопасная криптография, только пример паттерна.
    """

    def __init__(self, base: io.RawIOBase, key: bytes):
        if not (base.readable() or base.writable()):
            raise ValueError("base must be readable or writable")
        if not key:
            raise ValueError("key must be non-empty")
        self._base = base
        self._key = key
        self._pos = 0         # общий offset для потока ключа
        self._closed = False

    # --- служебные методы ---

    def _xor_with_keystream(self, data: bytes) -> bytes:
        """XOR data с циклическим ключом начиная с self._pos."""
        key = self._key
        klen = len(key)
        res = bytearray(len(data))
        pos = self._pos
        for i, b in enumerate(data):
            res[i] = b ^ key[pos % klen]
            pos += 1
        self._pos = pos
        return bytes(res)

    # --- RawIOBase API ---

    def readable(self):
        return self._base.readable()

    def writable(self):
        return self._base.writable()

    def readinto(self, b):
        """
        Читаем зашифрованные байты из base.readinto(b),
        дешифруем их на месте и возвращаем количество байт.
        """
        if not self.readable():
            return 0
        if self._closed:
            return 0

        # читаем в буфер b "как есть"
        n = self._base.readinto(b)
        if not n:
            return n

        mv = memoryview(b)[:n]
        # дешифруем данные "в лоб": XOR поверх текущего содержимого
        decrypted = self._xor_with_keystream(bytes(mv))
        mv[:] = decrypted
        return n

    def write(self, b):
        """
        Шифруем b и пишем в базовый поток.
        """
        if not self.writable():
            raise io.UnsupportedOperation("not writable")
        if self._closed:
            raise ValueError("I/O operation on closed stream")

        if not isinstance(b, (bytes, bytearray, memoryview)):
            raise TypeError("bytes-like object required, not %r" % type(b))

        enc = self._xor_with_keystream(bytes(b))
        return self._base.write(enc)

    def flush(self):
        if hasattr(self._base, "flush"):
            self._base.flush()

    def close(self):
        if not self._closed:
            self._closed = True
            try:
                self._base.close()
            finally:
                super().close()

    @property
    def closed(self):
        return self._closed

Использование с файлом

Запись зашифрованных данных:

import io

key = b"supersecret"

## Открываем файл как сырое устройство
with open("secret.bin", "wb", buffering=0) as f_raw:
    # f_raw.raw – это RawIOBase под капотом (FileIO), но обычно
    # open(..., buffering=0) уже даёт "сырое" поведение
    enc_stream = EncryptingRawStream(f_raw, key)
    enc_stream.write(b"hello world")
    enc_stream.flush()

Чтение и расшифровка:

with open("secret.bin", "rb", buffering=0) as f_raw:
    dec_stream = EncryptingRawStream(f_raw, key)
    buf = io.BufferedReader(dec_stream)  # для удобства
    data = buf.read()
    print(data)  # b'hello world'

В реальной жизни вместо XOR нужно использовать нормальную библиотеку (например, cryptography), а EncryptingRawStream использовать как "драйвер", который внутрь себя прячет шифратор.

Пример Raw-обёртки над subprocess (stdin / stdout)

Типичный паттерн:

  1. Создаём subprocess.Popen(..., stdin=PIPE, stdout=PIPE).

  2. Получаем бинарные потоки proc.stdin и proc.stdout.

  3. Делаем над ними тонкие обёртки RawIOBase, чтобы:

    • иметь единый интерфейс для своего кода;
    • при желании дальше строить BufferedReader, TextIOWrapper, шифрование и т.д.

Обёртка для stdout (чтение)

import io
import subprocess

class ProcessStdoutRaw(io.RawIOBase):
    def __init__(self, proc: subprocess.Popen):
        if proc.stdout is None:
            raise ValueError("process has no stdout")
        self._proc = proc
        self._stdout = proc.stdout
        self._closed = False

    def readable(self):
        return True

    def writable(self):
        return False

    def readinto(self, b):
        if self._closed:
            return 0
        # stdout у Popen обычно уже буферизованный поток, у него есть readinto
        n = self._stdout.readinto(b)
        if n is None:
            # некоторые реализации могут вернуть None => трактуем как 0
            return 0
        return n

    def close(self):
        if not self._closed:
            self._closed = True
            try:
                if self._stdout:
                    self._stdout.close()
            finally:
                super().close()

    @property
    def closed(self):
        return self._closed

Обёртка для stdin (запись)

class ProcessStdinRaw(io.RawIOBase):
    def __init__(self, proc: subprocess.Popen):
        if proc.stdin is None:
            raise ValueError("process has no stdin")
        self._proc = proc
        self._stdin = proc.stdin
        self._closed = False

    def readable(self):
        return False

    def writable(self):
        return True

    def write(self, b):
        if self._closed:
            raise ValueError("I/O operation on closed stream")
        if not isinstance(b, (bytes, bytearray, memoryview)):
            raise TypeError("bytes-like object required")
        return self._stdin.write(b)

    def flush(self):
        if hasattr(self._stdin, "flush"):
            self._stdin.flush()

    def close(self):
        if not self._closed:
            self._closed = True
            try:
                if self._stdin:
                    self._stdin.close()
            finally:
                super().close()

    @property
    def closed(self):
        return self._closed

Пример: общение с процессом cat через RawIOBase

import io
import subprocess

## Запускаем простой эхо-процесс (cat на *nix; на Windows можно использовать 'python -u -c "import sys; sys.stdout.write(sys.stdin.read())"')
proc = subprocess.Popen(
    ["cat"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

raw_in = ProcessStdinRaw(proc)    # raw-поток на stdin процесса
raw_out = ProcessStdoutRaw(proc)  # raw-поток на stdout процесса

## Оборачиваем в буферы для удобства
buf_in = io.BufferedWriter(raw_in)
buf_out = io.BufferedReader(raw_out)

## Пишем
buf_in.write(b"hello subprocess\n")
buf_in.flush()
## Важно: не закрывать stdin, если процесс должен ещё работать

## Читаем ответ
line = buf_out.readline()
print(line)  # b'hello subprocess\n'

## Закрытие
raw_in.close()
raw_out.close()
proc.terminate()

Комбинирование: subprocess + шифрующий поток

Можно всё это ещё и скрестить:

  • есть процесс, у которого stdin/stdout обёрнуты в ProcessStdinRaw/ProcessStdoutRaw;
  • на них сверху - EncryptingRawStream;
  • а ещё выше - BufferedReader/Writer или TextIOWrapper.

Типовая цепочка, например, для шифруемой записи в stdin процесса:

## stdin процесса -> EncryptingRawStream -> BufferedWriter
proc = subprocess.Popen([...], stdin=subprocess.PIPE)
raw_in = ProcessStdinRaw(proc)
enc_in = EncryptingRawStream(raw_in, key=b"secret")
buf_in = io.BufferedWriter(enc_in)

buf_in.write(b"hello\n")  # отправится в процесс уже зашифрованным
buf_in.flush()