Сообщить об ошибке.

Создание полезного менеджера контекста в Python

Что такое контекстный менеджер?

Менеджер контекста - это объект, который можно использовать в блоке with для размещения некоторого кода между действием входа и действием выхода. Файловые объекты можно использовать в качестве менеджеров контекста для автоматического закрытия файла:

# абстрактный код
with open("example.txt", "w") as file:
    file.write("Hello, world!")
# 13
>>> file.closed
# True

Менеджерам контекста необходимы методы __enter__ и __exit__, а метод __exit__ должен принимать три позиционных аргумента:

# файл test.py
class Example:
    def __enter__(self):
        print("enter")

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("exit")

Код выше печатает "enter" при входе в блок with и "exit" при выходе из блока with:

# запускаем: python3 -i test.py
>>> with Example():
...     print("Python!")

# enter
# Python!
# exit

Из протокола менеджера контекста можно сделать вывод, что для его работы необходимы два "магических метода":

  • __enter__() - вызывается при вызове оператора with.
  • __exit__() вызывается при выходе из блока оператора with.

Возвращаемое значение из __enter__()

Посмотрим на менеджер контекста, который возвращает что-то из метода __enter__(), например, который засекает время выполнения кода внутри блока with:

# файл test1.py
import time

class Timer:
    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop = time.perf_counter()
        self.elapsed = self.stop - self.start

Этот код будет отслеживать, сколько времени потребовалось для запуска определенного блока кода with. Запустим его создав объект Timer(), используем with для запуска блока кода, а затем проверим атрибут Timer.elapsed в объекте Timer:

# запускаем: python3 -i test1.py
>>> t = Timer()
>>> with t:
...   result = list(range(1000000))

>>> t.elapsed
0.023407533999943553

Есть еще более короткий способ использования контекстного менеджера. Можно создать объект Timer() и присвоить его переменной, и все это в одной строке кода, используя блок with и ключевое слово as:

>>> with Timer() as t:
...     result = list(range(1000000))

>>> t.elapsed
0.029026785000041855

Это работает, так как метод __enter__ написанного менеджера контекста возвращает self:

def __enter__(self):
        self.start = time.perf_counter()
        return self

Таким образом, Timer возвращает фактический объект контекстного менеджера self, и именно он присваивается переменной t в блоке with.

Обычно, менеджеры контекста отслеживают какое-то полезное состояние своего собственного объекта, по этому метод __enter__ возвращает собственный объект self.

Аргументы, передаваемые в __exit__()

Как насчет метода __exit__()? Какие три аргумента он принимает? И нужно ли возвращать значение?

Если внутри блока with возникает исключение, то эти три аргумента будут следующими:

Если исключений не возникает, то все эти три аргумента будут иметь значение None.

Пример контекстного менеджера, который использует все три аргумента:

# файл test2.py
import logging

class LogException:
    def __init__(self, logger, level=logging.ERROR, suppress=False):
        self.logger, self.level, self.suppress = logger, level, suppress

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            info = (exc_type, exc_val, exc_tb)
            self.logger.log(self.level, "Произошло исключение", exc_info=info)
            return self.suppress
        return False

Этот контекстный менеджер регистрирует исключения по мере их возникновения (используя модуль logging). Менеджер LogException можно использовать следующим образом:

# запускаем: python3 -i test2.py
>>> logging.basicConfig(level=logging.DEBUG)
>>> logger = logging.getLogger("Пример")
>>> with LogException(logger):
...   result = 1 / 0

# ERROR:Пример:Произошло исключение
# Traceback (most recent call last):
#   File "<stdin>", line 2, in <module>
# ZeroDivisionError: division by zero
# Traceback (most recent call last):
#   File "<stdin>", line 2, in <module>
# ZeroDivisionError: division by zero

Видим ошибку ERROR, имя логгера ("Пример"), сообщение "Произошло исключение", а затем трассировку. В примере также видим вторую обратную трассировку, которая была напечатана Python при сбое программы (она и прервет дальнейшее выполнение программы).

Возвращаемое значение из метода __exit__()

Если контекстному менеджеру LogException() (смотрите выше) передать аргумент suppress=True , то можно увидеть что-то другое:

# файл test_exit.py
import logging
from test import LogException

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("example")

with LogException(logger, suppress=True):
    result = 1 / 0  

print("Программа завершила работу")

Запустив программу традиционным методом python3 test_exit.py, исключение регистрируется, но после блока with программа будет продолжать работу:

$ python3 test_exit.py
# ERROR:example:Произошло исключение
# Traceback (most recent call last):
#   File "/home/test/test_exit.py", line 9, in <module>
#     result = 1 / 0
# ZeroDivisionError: division by zero
# Программа завершила работу

Видим, что здесь печатается "Программа завершила работу". Что происходит? Итак, аргумент suppress используется менеджером контекста (из примера выше) для подавления исключения:

import logging

class LogException:
    ...

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            info = (exc_type, exc_val, exc_tb)
            self.logger.log(self.level, "Произошло исключение", exc_info=info)
            return self.suppress
        return False

Внимание! Если метод __exit__() , возвращает что-то истинное или правдивое, то любое вызываемое исключение фактически будет подавлено.

По умолчанию метод __exit__() возвращает None, как и любая другая функция по умолчанию. Если вернуть None, что равноценно False, то __exit__() просто продолжит вызывать это исключение.

Когда со всеми нюансами работы разобрались можно посмотреть на менеджер контекста, который делает что-то полезное.

Полезный контекстный менеджер

Этот контекстный менеджер временно изменяет значение переменной среды environ:

# файл test3.py
import os

class SetEnv():
    def __init__(self, var_name, new_value):
        self.var_name = var_name
        self.new_value = new_value

    def __enter__(self):
        self.original_value = os.environ.get(self.var_name)
        os.environ[self.var_name] = self.new_value

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.original_value is None:
            del os.environ[self.var_name]
        else:
            os.environ[self.var_name] = self.original_value

Допустим, переменная пользовательского окружения на компьютере в настоящее время имеет значение test (у вас будет своё значение):

>>> import os
>>> os.environ["USER"]
# 'test'

Если использовать этот контекстный менеджер, то в его блоке with переменная среды USER будет иметь другое значение:

# запускаем: python3 -i test3.py
>>> with SetEnv("USER", "super"):
...     print("USER env from `with`:", os.environ["USER"])

# USER env from `with`: super

# смотрим `environ` после выхода из блока `with`
>>> os.environ["USER"]
# 'test'

После выхода из блока with значение переменной среды возвращается к исходному значению. Все это благодаря методу __enter__ контекстного менеджера и методу __exit__, которые запускаются при входе и выходе из блока with.

Декоратор @contextmanager.

Модуль contextlib Python включает декоратор, который позволяет создавать менеджеры контекста, используя синтаксис функции (вместо использования типичного синтаксиса класса):

from contextlib import contextmanager
import os

@contextmanager
def set_env(var_name, new_value):
    original_value = os.environ.get(var_name)
    os.environ[var_name] = new_value
    try:
        yield
    finally:
        if original_value is None:
            del os.environ[var_name]
        else:
            os.environ[var_name] = original_value

Интересно то, что декоратор @contextmanager по-прежнему включает в себя __enter__ и __exit__: это просто очень умный помощник для создания объекта, имеющего эти методы. Этот декоратор может быть очень удобен, хотя у него есть ограничения!

Тонкое управление файловым объектом

Наступит время, когда потребуется более тонкое управление файловым объектом, для этого нужно создать собственный контекстный менеджер.

Для чего это нужно? Допустим, может возникнут такая ситуация, когда часто нужно будет открывать файлы .png и анализировать их. И что-бы не разбирать заголовочный файл каждый раз нужно создать узконаправленный менеджер контекста. Ниже код примера того, как это сделать. Здесь также будем использовать пользовательские итераторы.

Использование менеджера контекста представленного ниже - аналогично встроенной функции open():

with pngReader('file.png') as fp:
    # Выполнение пользовательских операций
    pass

Вот код класса контекстного менеджера с комментариями, который читает любой PNG файл:

class pngReader():
    # Формат файла png содержит в заголовке определенный набор символов.  
    # Используем его для проверки, что файл действительно является PNG.
    _expected_magic = b'\x89PNG\r\n\x1a\n'

    def __init__(self, file_path):
        # Убедимся, что файл имеет нужное расширение
        if not file_path.endswith('.png'):
            raise NameError("Файл должен иметь расширение '.png'")
        self.__path = file_path
        self.__file_object = None

    def __enter__(self):
        self.__file_object = open(self.__path, 'rb')
        magic = self.__file_object.read(8)
        # Проверяем первые 8 байт заголовка 
        # на определенный набор символов
        if magic != self._expected_magic:
            raise TypeError("Это не '.png' файл!")
        return self

    def __exit__(self, type, val, tb):
        self.__file_object.close()

    def __iter__(self):
        # Этот метод и следующий __next__() используются
        # для чтения PNG файла кусками
        return self

    def __next__(self):
        # Читаем файл кусками
        initial_data = self.__file_object.read(4)
        # Файл не был открыт или достигнут конец файла.
        if self.__file_object is None or initial_data == b'':
            # Поднимаем исключение StopIteration.
            raise StopIteration
        else:
            # Каждый блок имеет длину, тип, данные (на основе len) и crc
            chunk_len = int.from_bytes(initial_data, byteorder='big')
            chunk_type = self.__file_object.read(4)
            chunk_data = self.__file_object.read(chunk_len)
            chunk_crc = self.__file_object.read(4)
            # вернем их в виде кортежа
            return chunk_len, chunk_type, chunk_data, chunk_crc

Сохраните код в файл pngreader.py и расположите его в корневой директории. Теперь можно открыть любой .png файл и правильно его анализировать:

>>> from pngreader import pngReader
>>> png_file = '/path/to/png/file.png'
>>> with pngReader(png_file) as fp:
...     for ln, tp, dt, crc in fp:
...         print(f"{ln:05}, {tp}, {crc}")
...     
# 00013, b'IHDR', b'\xc8\xa8\x0b\xc2'
# 00428, b'zTXt', b'\x17Oy\xea'
# 00004, b'sBIT', b'|\x08d\x88'
# 06586, b'IDAT', b'\xc0\x14\xc5R'
# 00000, b'IEND', b'\xaeB`\x82'