Сообщить об ошибке.

Анализ логов Nginx модулем pandas в Python

Материал представляет собой мысли о том как можно подойти к анализу логов Nginx при помощи библиотеки pandas.

Настройка формата журнала логов Nginx

Для анализа логов Nginx с использованием Pandas в Python можно создать мощный инструмент для мониторинга посещаемости и нагрузки сайта. По умолчанию, формат строки логов имеет вид:

http {
...
    # формат логирования в Nginx по умолчанию (combined)
    # log_format combined '$remote_addr - $remote_user [$time_local] '
    #                     '"$request" $status $body_bytes_sent '
    #                     '"$http_referer" "$http_user_agent"';
}

Пример формата логов Nginx (combined)

192.168.1.1 - - [01/Jan/2023:12:00:00 +0000] "GET /page.html HTTP/1.1" 200 1234 
"https://example.com/referer" "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"

Список переменных Nginx, которые дополнительно можно использовать при настройке формата лога:

  • $args - аргументы в строке запроса
  • $binary_remote_addr - адрес клиента в бинарном виде, длина значения всегда 4 байта для IPv4-адресов или 16 байт для IPv6-адресов
  • $body_bytes_sent - число байт, переданное клиенту, без учёта заголовка ответа; переменная совместима с параметром “%B” модуля Apache
  • $bytes_sent - число байт, переданных клиенту (1.3.8, 1.2.5)
  • $connection - порядковый номер соединения (1.3.8, 1.2.5)
  • $connection_requests - текущее число запросов в соединении (1.3.8, 1.2.5)
  • $connection_time - время соединения в секундах с точностью до миллисекунд (1.19.10)
  • $content_length - поле “Content-Length” заголовка запроса
  • $content_type - поле “Content-Type” заголовка запроса
  • $document_uri - то же, что и $uri
  • $host - в порядке приоритета: имя хоста из строки запроса, или имя хоста из поля “Host” заголовка запроса, или имя сервера, соответствующего запросу
  • $hostname - имя хоста
  • $query_string - то же, что и $args
  • $realpath_root - абсолютный путь, соответствующий значению директивы root или alias для текущего запроса, в котором все символические ссылки преобразованы в реальные пути
  • $remote_addr - адрес клиента
  • $remote_port - порт клиента
  • $remote_user - имя пользователя, использованное в Basic аутентификации
  • $request - первоначальная строка запроса целиком
  • $request_completion - “OK” если запрос завершился, либо пустая строка
  • $request_filename - путь к файлу для текущего запроса, формируемый из директив root или alias и URI запроса
  • $request_id - уникальный идентификатор запроса, сформированный из 16 случайных байт, в шестнадцатеричном виде (1.11.0)
  • $request_length - длина запроса (включая строку запроса, заголовок и тело запроса) (1.3.12, 1.2.7)
  • $request_method - метод запроса, обычно “GET” или “POST”
  • $request_time - время обработки запроса в секундах с точностью до миллисекунд; время, прошедшее с момента чтения первых байт от клиента до момента записи в лог после отправки последних байт клиенту
  • $request_uri - первоначальный URI запроса целиком (с аргументами)
  • $scheme - схема запроса, “http” или “https”
  • $server_addr - адрес сервера, принявшего запрос. Получение значения этой переменной обычно требует одного системного вызова. Чтобы избежать системного вызова, в директивах listen следует указывать адреса и использовать параметр bind.
  • $server_name - имя сервера, принявшего запрос
  • $server_port - порт сервера, принявшего запрос
  • $server_protocol - протокол запроса, обычно “HTTP/1.0”, “HTTP/1.1”, “HTTP/2.0” или “HTTP/3.0”
  • $status - статус ответа (1.3.2, 1.2.2)
  • $time_iso8601 - локальное время в формате по стандарту ISO 8601 (1.3.12, 1.2.7)
  • $time_local - локальное время в Common Log Format (1.3.12, 1.2.7)
  • $tcpinfo_rtt, $tcpinfo_rttvar, $tcpinfo_snd_cwnd, $tcpinfo_rcv_space - информация о клиентском TCP-соединении; доступна на системах, поддерживающих параметр сокета TCP_INFO
  • $uri - текущий URI запроса в нормализованном виде
  • $msec - время в секундах с точностью до миллисекунд на момент записи в лог
  • $pipe - “p” если запрос был pipelined, иначе “.”

Загрузка лог-файла в DataFrame.

Согласно НОВОМУ формату журнала Nginx - первый столбец представляет собой дату и время запроса, следовательно делаем из нее индексные метки строк.

import pandas as pd
import re

def parse_nginx_logs(log_file):
    pattern = r'(?P<ip>\S+) - - \[(?P<date>.+?)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+) "(?P<referrer>.+?)" "(?P<user_agent>.+?)"'
    
    logs = []
    with open(log_file) as f:
        for line in f:
            match = re.match(pattern, line)
            if match:
                logs.append(match.groupdict())
    
    df = pd.DataFrame(logs)
    
    # Преобразование типов данных
    df['date'] = pd.to_datetime(df['date'], format='%d/%b/%Y:%H:%M:%S %z')
    df['status'] = pd.to_numeric(df['status'])
    df['size'] = pd.to_numeric(df['size'])
    
    return df

# Загрузка логов
df = parse_nginx_logs('/var/log/nginx/access.log')

Базовый анализ данных

Общая статистика

print(f"Всего записей: {len(df)}")
print(f"Период данных: от {df['date'].min()} до {df['date'].max()}")
print("\nСтатистика по размеру ответов:")
print(df['size'].describe())

Топ страниц по посещаемости

top_pages = df['url'].value_counts().head(10)
print("Топ 10 страниц по посещаемости:")
print(top_pages)

Анализ посещаемости по времени

По часам

df['hour'] = df['date'].dt.hour
hourly_visits = df.groupby('hour').size()
hourly_visits.plot(kind='bar', title='Посещаемость по часам')

По дням, в случае если DF создается из нескольких дневных логов

df['day'] = df['date'].dt.date
daily_visits = df.groupby('day').size()
daily_visits.plot(title='Посещаемость по дням', figsize=(12, 6))

Анализ нагрузки

Объем переданных данных

daily_traffic = df.groupby('day')['size'].sum() / (1024 * 1024)  # в MB
daily_traffic.plot(title='Объем переданных данных (MB) по дням', figsize=(12, 6))

Средний размер ответа

avg_response_size = df.groupby('url')['size'].mean().sort_values(ascending=False).head(10)
print("Топ 10 страниц по среднему размеру ответа:")
print(avg_response_size)

Анализ производительности

Время обработки запросов (если включить в логи параметр nginx $request_time):

df['request_time'] = df['request_time'].astype(float)
slow_requests = df[df['request_time'] > 2].sort_values('request_time', ascending=False)

Самые медленные страницы:

df.groupby('url')['request_time'].mean().sort_values(ascending=False).head(10)

Анализ трафика

Топ источников трафика (рефереры):

df[df['referrer'] != '-']['referrer'].value_counts().head(20)

Топ поисковых систем:

search_engines = df['referrer'].str.extract(r'(google|yandex|bing|yahoo)')[0].dropna()
search_engines.value_counts()

Анализ безопасности

Подозрительные запросы:

suspicious = df[df['url'].str.contains(r'(wp-admin|phpmyadmin|\.env|config\.php)', case=False)]

Сканирование уязвимостей:

scan_attempts = df[df['user_agent'].str.contains(r'(nikto|sqlmap|nmap|wpscan)', case=False)]

Анализ пользовательских устройств

Распределение по типам устройств:

df['device'] = 'Other'
df.loc[df['user_agent'].str.contains('Mobile|Android|iPhone', case=False), 'device'] = 'Mobile'
df.loc[df['user_agent'].str.contains('Windows|Linux|Mac', case=False), 'device'] = 'Desktop'
df['device'].value_counts()

Браузерная статистика:

df['browser'] = 'Other'
df.loc[df['user_agent'].str.contains('Chrome', case=False), 'browser'] = 'Chrome'
df.loc[df['user_agent'].str.contains('Firefox', case=False), 'browser'] = 'Firefox'
df.loc[df['user_agent'].str.contains('Safari', case=False) & ~df['user_agent'].str.contains('Chrome', case=False), 'browser'] = 'Safari'
df['browser'].value_counts()

Анализ API-запросов

Популярные API-эндпоинты:

api_calls = df[df['url'].str.startswith('/api/')]
api_calls['endpoint'] = api_calls['url'].str.extract(r'(/api/v\d+/\w+)')[0]
api_calls['endpoint'].value_counts().head(10)

Пики нагрузки

Часы наибольшей нагрузки:

df['hour'] = df['date'].dt.hour
df.groupby('hour').size().plot(kind='bar')

Дни недели с максимальным трафиком:

df['weekday'] = df['date'].dt.day_name()
df.groupby('weekday').size().reindex(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']).plot(kind='bar')

Что еще можно извлечь из логов?

Эффективность кеширования

cached = df[df['status'] == 304]
print(f"Процент закешированных запросов: {len(cached)/len(df)*100:.2f}%")

Распределение по методам запросов:

df['method'].value_counts()

Популярные файлы для скачивания:

downloads = df[df['url'].str.contains(r'\.(zip|pdf|docx|xlsx|pptx|jpg|png)$', case=False)]
downloads['url'].value_counts().head(10)

Выявление горячих ссылок (прямые ссылки на медиа-файлы):

hotlinks = df[df['referrer'].str.contains('^https?://(?!yourdomain.com)', na=False, regex=True)]
hotlinks = hotlinks[hotlinks['url'].str.contains(r'\.(jpg|png|gif|mp4|pdf)$', case=False)]
hotlinks.groupby(['referrer', 'url']).size().sort_values(ascending=False).head(10)

Сравнение мобильного и десктопного трафика:

df['is_mobile'] = df['user_agent'].str.contains('Mobile|Android|iPhone', case=False)
mobile_vs_desktop = df.groupby('is_mobile').size()
mobile_vs_desktop.plot(kind='pie', labels=['Desktop', 'Mobile'], autopct='%1.1f%%')

Статистика по HTTP статус-кодам в логах Nginx

Этот анализ поможет выявить:

  • Проблемные страницы сайта
  • Попытки доступа к несуществующим ресурсам
  • Серверные проблемы (5xx ошибки)
  • Подозрительную активность (много ошибок с определенных IP)
  • Общее качество обслуживания запросов

Базовая статистика по статус-кодам

# Общее распределение статус-кодов
status_stats = df['status'].value_counts().sort_index()
print("Распределение HTTP статус-кодов:")
print(status_stats)

Визуализация распределения

import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
status_stats.plot(kind='bar', color='skyblue')
plt.title('Распределение HTTP статус-кодов')
plt.xlabel('Код статуса')
plt.ylabel('Количество запросов')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

Группировка по классам статус-кодов

def classify_status(code):
    if 100 <= code < 200:
        return '1xx Informational'
    elif 200 <= code < 300:
        return '2xx Success'
    elif 300 <= code < 400:
        return '3xx Redirection'
    elif 400 <= code < 500:
        return '4xx Client Error'
    else:
        return '5xx Server Error'

df['status_class'] = df['status'].apply(classify_status)
class_stats = df['status_class'].value_counts()
print("\nРаспределение по классам статус-кодов:")
print(class_stats)

Анализ ошибок (4xx и 5xx)

# Фильтрация ошибок
errors = df[df['status'] >= 400]

if not errors.empty:
    # Топ страниц с ошибками
    error_pages = errors['url'].value_counts().head(10)
    print("\nТоп страниц с ошибками:")
    print(error_pages)
    
    # Распределение ошибок по IP
    error_ips = errors['ip'].value_counts().head(10)
    print("\nТоп IP с ошибками:")
    print(error_ips)
    
    # Распределение ошибок во времени
    errors['hour'] = errors['date'].dt.hour
    error_hourly = errors.groupby('hour').size()
    error_hourly.plot(title='Ошибки по часам', kind='bar')
else:
    print("\nОшибок не обнаружено")

Детальный анализ 404 ошибок

not_found = df[df['status'] == 404]

if not not_found.empty:
    print(f"\nВсего 404 ошибок: {len(not_found)}")
    
    # Топ несуществующих страниц
    top_404_urls = not_found['url'].value_counts().head(10)
    print("\nТоп несуществующих страниц:")
    print(top_404_urls)
    
    # Источники 404 ошибок
    top_404_refs = not_found['referrer'].value_counts().head(10)
    print("\nТоп источников 404 ошибок:")
    print(top_404_refs[top_404_refs.index != '-'])
    
    # Временное распределение 404 ошибок
    not_found['day'] = not_found['date'].dt.date
    daily_404 = not_found.groupby('day').size()
    daily_404.plot(title='Динамика 404 ошибок по дням')
else:
    print("\n404 ошибок не обнаружено")

Анализ 5xx ошибок сервера

server_errors = df[df['status'] >= 500]

if not server_errors.empty:
    print(f"\nВсего 5xx ошибок: {len(server_errors)}")
    
    # Распределение по типам 5xx ошибок
    server_error_types = server_errors['status'].value_counts()
    print("\nТипы серверных ошибок:")
    print(server_error_types)
    
    # Время возникновения ошибок
    server_errors['hour'] = server_errors['date'].dt.hour
    error_hours = server_errors.groupby('hour').size()
    error_hours.plot(kind='bar', title='5xx ошибки по часам')
    
    # URL с наибольшим количеством серверных ошибок
    error_urls = server_errors['url'].value_counts().head(10)
    print("\nТоп URL с серверными ошибками:")
    print(error_urls)
else:
    print("\n5xx ошибок не обнаружено")

Соотношение успешных и ошибочных запросов

success_rate = len(df[df['status'] < 400]) / len(df) * 100
error_rate = 100 - success_rate

print(f"\nУспешных запросов: {success_rate:.2f}%")
print(f"Ошибочных запросов: {error_rate:.2f}%")

# Визуализация соотношения
plt.figure(figsize=(6, 6))
plt.pie([success_rate, error_rate], 
        labels=['Успешные', 'Ошибки'], 
        autopct='%1.1f%%',
        colors=['lightgreen', 'lightcoral'])
plt.title('Соотношение успешных и ошибочных запросов')
plt.show()

Экспорт статистики

# Сохранение полной статистики в Excel
with pd.ExcelWriter('status_stats.xlsx') as writer:
    status_stats.to_excel(writer, sheet_name='All Status Codes')
    class_stats.to_excel(writer, sheet_name='Status Classes')
    
    if not errors.empty:
        error_pages.to_excel(writer, sheet_name='Top Error Pages')
        error_ips.to_excel(writer, sheet_name='Top Error IPs')
    
    if not not_found.empty:
        top_404_urls.to_excel(writer, sheet_name='Top 404 URLs')
    
    if not server_errors.empty:
        server_error_types.to_excel(writer, sheet_name='Server Error Types')

Фильтрация ботов из результатов анализа логов Nginx

Для очистки результатов от ботов (поисковых роботов, сканеров и автоматизированных систем) можно использовать несколько подходов:

Фильтрация по известным User-Agent ботов

def filter_out_bots(df):
    """Фильтрует логи известных ботов по User-Agent"""
    bot_patterns = [
        r'bot', r'crawl', r'spider', r'scanner', r'slurp', 
        r'search', r'yahoo', r'google', r'bing', r'yandex',
        r'facebook', r'twitter', r'linkedin', r'headless',
        r'python', r'curl', r'wget', r'httpclient', r'java',
        r'php', r'ruby', r'perl', r'nikto', r'sqlmap', r'nmap'
    ]
    
    pattern = '|'.join(bot_patterns)
    is_bot = df['user_agent'].str.contains(pattern, case=False, na=False)
    
    return df[~is_bot]

# Применение фильтра
human_df = filter_out_bots(df)
print(f"Осталось записей после фильтрации ботов: {len(human_df)}")

Использование списка известных ботов

Более точный способ - использовать официальный список User-Agent'ов ботов:

import requests

def get_bot_user_agents():
    """Получает актуальный список User-Agent'ов ботов"""
    try:
        url = "https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json"
        response = requests.get(url)
        bot_agents = [agent['pattern'] for agent in response.json()]
        return bot_agents
    except:
        # Локальный fallback список, если не удалось получить онлайн
        return [
            'Googlebot', 'Bingbot', 'Slurp', 'DuckDuckBot', 
            'Baiduspider', 'YandexBot', 'Sogou', 'Exabot', 
            'facebot', 'ia_archiver', 'AhrefsBot'
        ]

def filter_bots_advanced(df):
    """Фильтрация с использованием актуального списка ботов"""
    bot_agents = get_bot_user_agents()
    pattern = '|'.join(bot_agents)
    is_bot = df['user_agent'].str.contains(pattern, case=False, na=False)
    return df[~is_bot]

# Применение расширенного фильтра
human_df_advanced = filter_bots_advanced(df)

Комбинированный подход (User-Agent + поведенческий анализ)

def combined_bot_filter(df):
    """Комбинированная фильтрация ботов"""
    # 1. Фильтр по User-Agent
    df = filter_out_bots(df)
    
    # 2. Поведенческие характеристики ботов
    # - Очень высокая частота запросов
    ip_request_counts = df['ip'].value_counts()
    suspicious_ips = ip_request_counts[ip_request_counts > 1000].index
    df = df[~df['ip'].isin(suspicious_ips)]
    
    # - Запросы роботов.txt и других служебных файлов
    bot_paths = ['/robots.txt', '/sitemap.xml', '/wp-admin', '/xmlrpc.php']
    pattern = '|'.join(bot_paths)
    is_bot_path = df['url'].str.contains(pattern, case=False, na=False)
    df = df[~is_bot_path]
    
    return df

# Применение комбинированного фильтра
clean_df = combined_bot_filter(df)

Дополнительные методы обнаружения ботов

def detect_other_bots(df):
    """Дополнительные методы обнаружения ботов"""
    # 1. Отсутствие Referrer + специфичные User-Agent
    no_referrer = df['referrer'].isin(['-', ''])
    simple_ua = df['user_agent'].str.len() < 20
    bots = df[no_referrer & simple_ua]
    
    # 2. Запросы с нестандартными методами (не GET/POST)
    non_standard_methods = ~df['method'].isin(['GET', 'POST', 'HEAD'])
    bots = pd.concat([bots, df[non_standard_methods]])
    
    # 3. Запросы к несуществующим ресурсам (404)
    not_found = df['status'] == 404
    frequent_404 = df[not_found]['ip'].value_counts().head(20).index
    bots = pd.concat([bots, df[df['ip'].isin(frequent_404)]])
    
    return bots.drop_duplicates()

# Выявление дополнительных ботов
other_bots = detect_other_bots(df)
print(f"Найдено дополнительных ботов: {len(other_bots)}")

Важные замечания

  • Для точной фильтрации можно использовать специализированные решения:
    • https://github.com/ua-parser/uap-python
    • https://github.com/opawg/user-agents-v2
  • После очистки данных от ботов получим более релевантные результаты анализа поведения реальных пользователей на сайте.

Геоанализ (требуется дополнительная библиотека)

try:
    from geolite2 import geolite2
    reader = geolite2.reader()
    
    def get_country(ip):
        try:
            match = reader.get(ip)
            if match and 'country' in match:
                return match['country']['names']['en']
        except:
            pass
        return None
    
    df['country'] = df['ip'].apply(get_country)
    country_stats = df['country'].value_counts().head(10)
    print("\nТоп стран по посещаемости:")
    print(country_stats)
    
    geolite2.close()
except ImportError:
    print("\nДля геоанализа установите библиотеку maxminddb-geolite2")

Определение смены User-Agent для IP-адресов в логах Nginx

Чтобы выявить случаи, когда один IP-адрес использует разные User-Agent'ы, можно использовать следующий подход с Pandas:

import pandas as pd

def detect_user_agent_changes(df):
    """
    Определяет IP-адреса, которые использовали разные User-Agent'ы
    Возвращает DataFrame с результатами
    """
    # Группируем по IP и собираем уникальные User-Agent'ы
    ip_ua = df.groupby('ip')['user_agent'].agg(['nunique', 'unique'])
    
    # Фильтруем IP с более чем одним User-Agent'ом
    multi_ua_ips = ip_ua[ip_ua['nunique'] > 1]
    
    # Сортируем по количеству уникальных User-Agent'ов
    multi_ua_ips = multi_ua_ips.sort_values('nunique', ascending=False)
    
    return multi_ua_ips

# Пример использования
ua_changes = detect_user_agent_changes(df)
print(f"Найдено {len(ua_changes)} IP-адресов с разными User-Agent'ами")
print(ua_changes.head(20))

Дополнительный анализ с временными метками

Чтобы увидеть, когда именно происходила смена User-Agent:

def get_ua_change_timeline(df, ip_address):
    """
    Возвращает временную шкалу смены User-Agent для конкретного IP
    """
    ip_logs = df[df['ip'] == ip_address].sort_values('date')
    
    # Определяем моменты смены User-Agent
    ip_logs['ua_changed'] = ip_logs['user_agent'] != ip_logs['user_agent'].shift(1)
    
    # Фильтруем только моменты смены
    changes = ip_logs[ip_logs['ua_changed']][['date', 'user_agent']]
    
    return changes

# Пример использования для топ-5 IP с наибольшим количеством User-Agent'ов
top_ips = ua_changes.head().index
for ip in top_ips:
    print(f"\nИстория смен User-Agent для IP {ip}:")
    print(get_ua_change_timeline(df, ip))

Визуализация результатов

import matplotlib.pyplot as plt

# Топ-20 IP по количеству User-Agent'ов
top_20 = ua_changes.head(20)

plt.figure(figsize=(12, 8))
top_20['nunique'].plot(kind='bar')
plt.title('Топ IP-адресов по количеству используемых User-Agent\'ов')
plt.xlabel('IP-адрес')
plt.ylabel('Количество уникальных User-Agent\'ов')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

Интерпретация результатов

  • Естественные случаи:
    • Пользователь может заходить с разных устройств (телефон + компьютер)
    • Обновление браузера может изменить строку User-Agent
  • Подозрительные случаи:
    • Очень большое количество разных User-Agent'ов
    • Частая смена User-Agent в короткий промежуток времени
    • Использование поддельных или устаревших User-Agent'ов

Рекомендации:

  • Для подозрительных IP можно реализовать автоматическое блокирование
  • Установить лимит на количество разных User-Agent'ов с одного IP
  • Логировать такие случаи для дальнейшего анализа

Дополнительные фильтры

# Фильтр для очень активных IP (например, более 5 разных User-Agent'ов)
suspicious_ips = ua_changes[ua_changes['nunique'] > 5]
print("\nПодозрительные IP с более чем 5 разными User-Agent'ами:")
print(suspicious_ips)

# Фильтр по определенным подозрительным User-Agent'ам
suspicious_ua = ['python-requests', 'curl', 'wget', 'nikto', 'sqlmap']
suspicious = df[df['user_agent'].str.contains('|'.join(suspicious_ua), case=False)]
print("\nЗапросы с подозрительными User-Agent'ами:")
print(suspicious[['ip', 'date', 'user_agent']].sort_values('ip'))