Материал представляет собой мысли о том как можно подойти к анализу логов Nginx при помощи библиотеки
pandas
.
Для анализа логов Nginx с использованием Pandas в Python можно создать мощный инструмент для мониторинга посещаемости и нагрузки сайта. По умолчанию, формат строки логов имеет вид:
http { ... # формат логирования в Nginx по умолчанию (combined) # log_format combined '$remote_addr - $remote_user [$time_local] ' # '"$request" $status $body_bytes_sent ' # '"$http_referer" "$http_user_agent"'; }
Пример формата логов Nginx (combined)
192.168.1.1 - - [01/Jan/2023:12:00:00 +0000] "GET /page.html HTTP/1.1" 200 1234 "https://example.com/referer" "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
Список переменных Nginx, которые дополнительно можно использовать при настройке формата лога:
$args
- аргументы в строке запроса$binary_remote_addr
- адрес клиента в бинарном виде, длина значения всегда 4 байта для IPv4-адресов или 16 байт для IPv6-адресов$body_bytes_sent
- число байт, переданное клиенту, без учёта заголовка ответа; переменная совместима с параметром “%B” модуля Apache $bytes_sent
- число байт, переданных клиенту (1.3.8, 1.2.5)$connection
- порядковый номер соединения (1.3.8, 1.2.5)$connection_requests
- текущее число запросов в соединении (1.3.8, 1.2.5)$connection_time
- время соединения в секундах с точностью до миллисекунд (1.19.10)$content_length
- поле “Content-Length” заголовка запроса$content_type
- поле “Content-Type” заголовка запроса$document_uri
- то же, что и $uri$host
- в порядке приоритета: имя хоста из строки запроса, или имя хоста из поля “Host” заголовка запроса, или имя сервера, соответствующего запросу$hostname
- имя хоста$query_string
- то же, что и $args$realpath_root
- абсолютный путь, соответствующий значению директивы root или alias для текущего запроса, в котором все символические ссылки преобразованы в реальные пути$remote_addr
- адрес клиента$remote_port
- порт клиента$remote_user
- имя пользователя, использованное в Basic аутентификации$request
- первоначальная строка запроса целиком$request_completion
- “OK” если запрос завершился, либо пустая строка$request_filename
- путь к файлу для текущего запроса, формируемый из директив root или alias и URI запроса$request_id
- уникальный идентификатор запроса, сформированный из 16 случайных байт, в шестнадцатеричном виде (1.11.0)$request_length
- длина запроса (включая строку запроса, заголовок и тело запроса) (1.3.12, 1.2.7)$request_method
- метод запроса, обычно “GET” или “POST”$request_time
- время обработки запроса в секундах с точностью до миллисекунд; время, прошедшее с момента чтения первых байт от клиента до момента записи в лог после отправки последних байт клиенту$request_uri
- первоначальный URI запроса целиком (с аргументами)$scheme
- схема запроса, “http” или “https”$server_addr
- адрес сервера, принявшего запрос. Получение значения этой переменной обычно требует одного системного вызова. Чтобы избежать системного вызова, в директивах listen следует указывать адреса и использовать параметр bind.$server_name
- имя сервера, принявшего запрос$server_port
- порт сервера, принявшего запрос$server_protocol
- протокол запроса, обычно “HTTP/1.0”, “HTTP/1.1”, “HTTP/2.0” или “HTTP/3.0”$status
- статус ответа (1.3.2, 1.2.2)$time_iso8601
- локальное время в формате по стандарту ISO 8601 (1.3.12, 1.2.7)$time_local
- локальное время в Common Log Format (1.3.12, 1.2.7)$tcpinfo_rtt
, $tcpinfo_rttvar
, $tcpinfo_snd_cwnd
, $tcpinfo_rcv_space
- информация о клиентском TCP-соединении; доступна на системах, поддерживающих параметр сокета TCP_INFO$uri
- текущий URI запроса в нормализованном виде$msec
- время в секундах с точностью до миллисекунд на момент записи в лог$pipe
- “p” если запрос был pipelined, иначе “.”DataFrame
.Согласно НОВОМУ формату журнала Nginx - первый столбец представляет собой дату и время запроса, следовательно делаем из нее индексные метки строк.
import pandas as pd import re def parse_nginx_logs(log_file): pattern = r'(?P<ip>\S+) - - \[(?P<date>.+?)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+) "(?P<referrer>.+?)" "(?P<user_agent>.+?)"' logs = [] with open(log_file) as f: for line in f: match = re.match(pattern, line) if match: logs.append(match.groupdict()) df = pd.DataFrame(logs) # Преобразование типов данных df['date'] = pd.to_datetime(df['date'], format='%d/%b/%Y:%H:%M:%S %z') df['status'] = pd.to_numeric(df['status']) df['size'] = pd.to_numeric(df['size']) return df # Загрузка логов df = parse_nginx_logs('/var/log/nginx/access.log')
print(f"Всего записей: {len(df)}") print(f"Период данных: от {df['date'].min()} до {df['date'].max()}") print("\nСтатистика по размеру ответов:") print(df['size'].describe())
top_pages = df['url'].value_counts().head(10) print("Топ 10 страниц по посещаемости:") print(top_pages)
По часам
df['hour'] = df['date'].dt.hour hourly_visits = df.groupby('hour').size() hourly_visits.plot(kind='bar', title='Посещаемость по часам')
По дням, в случае если DF создается из нескольких дневных логов
df['day'] = df['date'].dt.date daily_visits = df.groupby('day').size() daily_visits.plot(title='Посещаемость по дням', figsize=(12, 6))
Объем переданных данных
daily_traffic = df.groupby('day')['size'].sum() / (1024 * 1024) # в MB daily_traffic.plot(title='Объем переданных данных (MB) по дням', figsize=(12, 6))
Средний размер ответа
avg_response_size = df.groupby('url')['size'].mean().sort_values(ascending=False).head(10) print("Топ 10 страниц по среднему размеру ответа:") print(avg_response_size)
Время обработки запросов (если включить в логи параметр nginx $request_time
):
df['request_time'] = df['request_time'].astype(float) slow_requests = df[df['request_time'] > 2].sort_values('request_time', ascending=False)
Самые медленные страницы:
df.groupby('url')['request_time'].mean().sort_values(ascending=False).head(10)
Топ источников трафика (рефереры):
df[df['referrer'] != '-']['referrer'].value_counts().head(20)
Топ поисковых систем:
search_engines = df['referrer'].str.extract(r'(google|yandex|bing|yahoo)')[0].dropna() search_engines.value_counts()
Подозрительные запросы:
suspicious = df[df['url'].str.contains(r'(wp-admin|phpmyadmin|\.env|config\.php)', case=False)]
Сканирование уязвимостей:
scan_attempts = df[df['user_agent'].str.contains(r'(nikto|sqlmap|nmap|wpscan)', case=False)]
Распределение по типам устройств:
df['device'] = 'Other' df.loc[df['user_agent'].str.contains('Mobile|Android|iPhone', case=False), 'device'] = 'Mobile' df.loc[df['user_agent'].str.contains('Windows|Linux|Mac', case=False), 'device'] = 'Desktop' df['device'].value_counts()
Браузерная статистика:
df['browser'] = 'Other' df.loc[df['user_agent'].str.contains('Chrome', case=False), 'browser'] = 'Chrome' df.loc[df['user_agent'].str.contains('Firefox', case=False), 'browser'] = 'Firefox' df.loc[df['user_agent'].str.contains('Safari', case=False) & ~df['user_agent'].str.contains('Chrome', case=False), 'browser'] = 'Safari' df['browser'].value_counts()
Популярные API-эндпоинты:
api_calls = df[df['url'].str.startswith('/api/')] api_calls['endpoint'] = api_calls['url'].str.extract(r'(/api/v\d+/\w+)')[0] api_calls['endpoint'].value_counts().head(10)
Часы наибольшей нагрузки:
df['hour'] = df['date'].dt.hour df.groupby('hour').size().plot(kind='bar')
Дни недели с максимальным трафиком:
df['weekday'] = df['date'].dt.day_name() df.groupby('weekday').size().reindex(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']).plot(kind='bar')
Эффективность кеширования
cached = df[df['status'] == 304] print(f"Процент закешированных запросов: {len(cached)/len(df)*100:.2f}%")
Распределение по методам запросов:
df['method'].value_counts()
Популярные файлы для скачивания:
downloads = df[df['url'].str.contains(r'\.(zip|pdf|docx|xlsx|pptx|jpg|png)$', case=False)] downloads['url'].value_counts().head(10)
Выявление горячих ссылок (прямые ссылки на медиа-файлы):
hotlinks = df[df['referrer'].str.contains('^https?://(?!yourdomain.com)', na=False, regex=True)] hotlinks = hotlinks[hotlinks['url'].str.contains(r'\.(jpg|png|gif|mp4|pdf)$', case=False)] hotlinks.groupby(['referrer', 'url']).size().sort_values(ascending=False).head(10)
Сравнение мобильного и десктопного трафика:
df['is_mobile'] = df['user_agent'].str.contains('Mobile|Android|iPhone', case=False) mobile_vs_desktop = df.groupby('is_mobile').size() mobile_vs_desktop.plot(kind='pie', labels=['Desktop', 'Mobile'], autopct='%1.1f%%')
Этот анализ поможет выявить:
# Общее распределение статус-кодов status_stats = df['status'].value_counts().sort_index() print("Распределение HTTP статус-кодов:") print(status_stats)
import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) status_stats.plot(kind='bar', color='skyblue') plt.title('Распределение HTTP статус-кодов') plt.xlabel('Код статуса') plt.ylabel('Количество запросов') plt.grid(axis='y', linestyle='--', alpha=0.7) plt.show()
def classify_status(code): if 100 <= code < 200: return '1xx Informational' elif 200 <= code < 300: return '2xx Success' elif 300 <= code < 400: return '3xx Redirection' elif 400 <= code < 500: return '4xx Client Error' else: return '5xx Server Error' df['status_class'] = df['status'].apply(classify_status) class_stats = df['status_class'].value_counts() print("\nРаспределение по классам статус-кодов:") print(class_stats)
# Фильтрация ошибок errors = df[df['status'] >= 400] if not errors.empty: # Топ страниц с ошибками error_pages = errors['url'].value_counts().head(10) print("\nТоп страниц с ошибками:") print(error_pages) # Распределение ошибок по IP error_ips = errors['ip'].value_counts().head(10) print("\nТоп IP с ошибками:") print(error_ips) # Распределение ошибок во времени errors['hour'] = errors['date'].dt.hour error_hourly = errors.groupby('hour').size() error_hourly.plot(title='Ошибки по часам', kind='bar') else: print("\nОшибок не обнаружено")
not_found = df[df['status'] == 404] if not not_found.empty: print(f"\nВсего 404 ошибок: {len(not_found)}") # Топ несуществующих страниц top_404_urls = not_found['url'].value_counts().head(10) print("\nТоп несуществующих страниц:") print(top_404_urls) # Источники 404 ошибок top_404_refs = not_found['referrer'].value_counts().head(10) print("\nТоп источников 404 ошибок:") print(top_404_refs[top_404_refs.index != '-']) # Временное распределение 404 ошибок not_found['day'] = not_found['date'].dt.date daily_404 = not_found.groupby('day').size() daily_404.plot(title='Динамика 404 ошибок по дням') else: print("\n404 ошибок не обнаружено")
server_errors = df[df['status'] >= 500] if not server_errors.empty: print(f"\nВсего 5xx ошибок: {len(server_errors)}") # Распределение по типам 5xx ошибок server_error_types = server_errors['status'].value_counts() print("\nТипы серверных ошибок:") print(server_error_types) # Время возникновения ошибок server_errors['hour'] = server_errors['date'].dt.hour error_hours = server_errors.groupby('hour').size() error_hours.plot(kind='bar', title='5xx ошибки по часам') # URL с наибольшим количеством серверных ошибок error_urls = server_errors['url'].value_counts().head(10) print("\nТоп URL с серверными ошибками:") print(error_urls) else: print("\n5xx ошибок не обнаружено")
success_rate = len(df[df['status'] < 400]) / len(df) * 100 error_rate = 100 - success_rate print(f"\nУспешных запросов: {success_rate:.2f}%") print(f"Ошибочных запросов: {error_rate:.2f}%") # Визуализация соотношения plt.figure(figsize=(6, 6)) plt.pie([success_rate, error_rate], labels=['Успешные', 'Ошибки'], autopct='%1.1f%%', colors=['lightgreen', 'lightcoral']) plt.title('Соотношение успешных и ошибочных запросов') plt.show()
# Сохранение полной статистики в Excel with pd.ExcelWriter('status_stats.xlsx') as writer: status_stats.to_excel(writer, sheet_name='All Status Codes') class_stats.to_excel(writer, sheet_name='Status Classes') if not errors.empty: error_pages.to_excel(writer, sheet_name='Top Error Pages') error_ips.to_excel(writer, sheet_name='Top Error IPs') if not not_found.empty: top_404_urls.to_excel(writer, sheet_name='Top 404 URLs') if not server_errors.empty: server_error_types.to_excel(writer, sheet_name='Server Error Types')
Для очистки результатов от ботов (поисковых роботов, сканеров и автоматизированных систем) можно использовать несколько подходов:
def filter_out_bots(df): """Фильтрует логи известных ботов по User-Agent""" bot_patterns = [ r'bot', r'crawl', r'spider', r'scanner', r'slurp', r'search', r'yahoo', r'google', r'bing', r'yandex', r'facebook', r'twitter', r'linkedin', r'headless', r'python', r'curl', r'wget', r'httpclient', r'java', r'php', r'ruby', r'perl', r'nikto', r'sqlmap', r'nmap' ] pattern = '|'.join(bot_patterns) is_bot = df['user_agent'].str.contains(pattern, case=False, na=False) return df[~is_bot] # Применение фильтра human_df = filter_out_bots(df) print(f"Осталось записей после фильтрации ботов: {len(human_df)}")
Более точный способ - использовать официальный список User-Agent'ов ботов:
import requests def get_bot_user_agents(): """Получает актуальный список User-Agent'ов ботов""" try: url = "https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json" response = requests.get(url) bot_agents = [agent['pattern'] for agent in response.json()] return bot_agents except: # Локальный fallback список, если не удалось получить онлайн return [ 'Googlebot', 'Bingbot', 'Slurp', 'DuckDuckBot', 'Baiduspider', 'YandexBot', 'Sogou', 'Exabot', 'facebot', 'ia_archiver', 'AhrefsBot' ] def filter_bots_advanced(df): """Фильтрация с использованием актуального списка ботов""" bot_agents = get_bot_user_agents() pattern = '|'.join(bot_agents) is_bot = df['user_agent'].str.contains(pattern, case=False, na=False) return df[~is_bot] # Применение расширенного фильтра human_df_advanced = filter_bots_advanced(df)
def combined_bot_filter(df): """Комбинированная фильтрация ботов""" # 1. Фильтр по User-Agent df = filter_out_bots(df) # 2. Поведенческие характеристики ботов # - Очень высокая частота запросов ip_request_counts = df['ip'].value_counts() suspicious_ips = ip_request_counts[ip_request_counts > 1000].index df = df[~df['ip'].isin(suspicious_ips)] # - Запросы роботов.txt и других служебных файлов bot_paths = ['/robots.txt', '/sitemap.xml', '/wp-admin', '/xmlrpc.php'] pattern = '|'.join(bot_paths) is_bot_path = df['url'].str.contains(pattern, case=False, na=False) df = df[~is_bot_path] return df # Применение комбинированного фильтра clean_df = combined_bot_filter(df)
def detect_other_bots(df): """Дополнительные методы обнаружения ботов""" # 1. Отсутствие Referrer + специфичные User-Agent no_referrer = df['referrer'].isin(['-', '']) simple_ua = df['user_agent'].str.len() < 20 bots = df[no_referrer & simple_ua] # 2. Запросы с нестандартными методами (не GET/POST) non_standard_methods = ~df['method'].isin(['GET', 'POST', 'HEAD']) bots = pd.concat([bots, df[non_standard_methods]]) # 3. Запросы к несуществующим ресурсам (404) not_found = df['status'] == 404 frequent_404 = df[not_found]['ip'].value_counts().head(20).index bots = pd.concat([bots, df[df['ip'].isin(frequent_404)]]) return bots.drop_duplicates() # Выявление дополнительных ботов other_bots = detect_other_bots(df) print(f"Найдено дополнительных ботов: {len(other_bots)}")
Важные замечания
try: from geolite2 import geolite2 reader = geolite2.reader() def get_country(ip): try: match = reader.get(ip) if match and 'country' in match: return match['country']['names']['en'] except: pass return None df['country'] = df['ip'].apply(get_country) country_stats = df['country'].value_counts().head(10) print("\nТоп стран по посещаемости:") print(country_stats) geolite2.close() except ImportError: print("\nДля геоанализа установите библиотеку maxminddb-geolite2")
Чтобы выявить случаи, когда один IP-адрес использует разные User-Agent'ы, можно использовать следующий подход с Pandas:
import pandas as pd def detect_user_agent_changes(df): """ Определяет IP-адреса, которые использовали разные User-Agent'ы Возвращает DataFrame с результатами """ # Группируем по IP и собираем уникальные User-Agent'ы ip_ua = df.groupby('ip')['user_agent'].agg(['nunique', 'unique']) # Фильтруем IP с более чем одним User-Agent'ом multi_ua_ips = ip_ua[ip_ua['nunique'] > 1] # Сортируем по количеству уникальных User-Agent'ов multi_ua_ips = multi_ua_ips.sort_values('nunique', ascending=False) return multi_ua_ips # Пример использования ua_changes = detect_user_agent_changes(df) print(f"Найдено {len(ua_changes)} IP-адресов с разными User-Agent'ами") print(ua_changes.head(20))
Чтобы увидеть, когда именно происходила смена User-Agent:
def get_ua_change_timeline(df, ip_address): """ Возвращает временную шкалу смены User-Agent для конкретного IP """ ip_logs = df[df['ip'] == ip_address].sort_values('date') # Определяем моменты смены User-Agent ip_logs['ua_changed'] = ip_logs['user_agent'] != ip_logs['user_agent'].shift(1) # Фильтруем только моменты смены changes = ip_logs[ip_logs['ua_changed']][['date', 'user_agent']] return changes # Пример использования для топ-5 IP с наибольшим количеством User-Agent'ов top_ips = ua_changes.head().index for ip in top_ips: print(f"\nИстория смен User-Agent для IP {ip}:") print(get_ua_change_timeline(df, ip))
import matplotlib.pyplot as plt # Топ-20 IP по количеству User-Agent'ов top_20 = ua_changes.head(20) plt.figure(figsize=(12, 8)) top_20['nunique'].plot(kind='bar') plt.title('Топ IP-адресов по количеству используемых User-Agent\'ов') plt.xlabel('IP-адрес') plt.ylabel('Количество уникальных User-Agent\'ов') plt.xticks(rotation=45) plt.tight_layout() plt.show()
Интерпретация результатов
Рекомендации:
# Фильтр для очень активных IP (например, более 5 разных User-Agent'ов) suspicious_ips = ua_changes[ua_changes['nunique'] > 5] print("\nПодозрительные IP с более чем 5 разными User-Agent'ами:") print(suspicious_ips) # Фильтр по определенным подозрительным User-Agent'ам suspicious_ua = ['python-requests', 'curl', 'wget', 'nikto', 'sqlmap'] suspicious = df[df['user_agent'].str.contains('|'.join(suspicious_ua), case=False)] print("\nЗапросы с подозрительными User-Agent'ами:") print(suspicious[['ip', 'date', 'user_agent']].sort_values('ip'))