Created
April 24, 2022 08:22
-
-
Save strizhechenko/0fd3c0a4c6b1cb0cf9fc8b49bd007f35 to your computer and use it in GitHub Desktop.
Скрипт сбора текстовой информации из брифингов МО РФ в унифицированный CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" Скрипт сбора текстовой информации из брифингов МО РФ в унифицированный CSV """ | |
# coding: utf-8 | |
import os, re, csv, logging | |
from pathlib import Path | |
from urllib.parse import urlparse | |
import html2text, requests | |
from bs4 import BeautifulSoup | |
logging.basicConfig(format=u'[%(asctime)s] %(filename)s #%(levelname)-8s %(message)s', level=logging.INFO) | |
html = html2text.HTML2Text() | |
html.body_width, html.ignore_links, html.ignore_images = 20000, True, True | |
parse_tree = { | |
'total': [ | |
r'[0-9]+ военны. объект', | |
r'[0-9]+ объект.?.? военной' | |
], | |
'jets': [ | |
r'[0-9]+ боев.. самолет', | |
r'[0-9]+ самолет', | |
], | |
'helicopters': r'[0-9]+ вертолет', | |
'bpla': r'[0-9]+ беспилотны. летательны. аппарат', | |
'tanks': r'[0-9]+ танк', | |
'aerodrom': r'[0-9]+ военны. аэродром', | |
'command': r'[0-9]+ пункт.?.? управления', | |
'zrk': r'[0-9]+ зенитных ракетных комплексов', | |
'radio': r'[0-9]+ радиолокационных станций', | |
'auto': r'[0-9]+ единиц.? специальной военной автомобильной техники', | |
'boats': r'[0-9]+ боевых катеров', | |
'artillery': r'[0-9]+ оруди. полевой артиллерии', | |
'rszo': [ | |
r'[0-9]+ реактивн.. систем.? залпового огня', | |
r'[0-9]+ установ.. реактивных систем залпового огня' | |
], | |
} | |
def find_date(payload): | |
""" Находим дату брифинга 15.03.2022 (10:00)""" | |
d, t = re.findall(r'\d{2}.\d{2}.\d{4} \(\d{2}:\d{2}\)', payload)[0].split() | |
d = ".".join(reversed(d.split('.'))) | |
t = re.sub(r'[():]', '', t) | |
return d + "_" + t | |
def find_results(payload): | |
content = "\n".join(payload.split('\n')[50:]) | |
content = content.replace(',\n\n', ',') | |
patterns = [ | |
r'Российскими вооруженными силами с начала', | |
r'Также с начала', | |
r'Всего с начала', | |
r'Уничтожен[оы]', | |
r'Среди них: ', | |
r'Из них: ', | |
r'Сбито: ', | |
r'С начала.*операции ', | |
r'Всего за время', | |
r'К настоящему времени Российскими Вооруженными Силами', | |
r'Всего Вооружёнными Силами Российской Федерации', | |
r'Всего российскими Вооруженными Силами', | |
r'потери составляют:', | |
r'В ходе.*операции.*[.]' | |
] | |
bad_patterns = [ | |
r'глубин', | |
r'километр', | |
r'биологической', | |
r'^[^0-9]$' | |
] | |
pattern = r'(' + r".*[.]|".join(patterns) + r')' | |
bad_pattern = r'(' + r".*[.]|".join(bad_patterns) + r')' | |
res = re.findall(pattern, content, re.IGNORECASE) | |
res = [item for item in res if not re.findall(bad_pattern, item)] | |
if res: | |
return sorted(res) | |
def process_results(results: list, prev_day: dict): | |
output = dict(prev_day) | |
if not results: | |
return prev_day | |
for result in results: | |
for key, patterns in parse_tree.items(): | |
if key == 'jets': | |
if re.findall(r'самолет.?.? и вертолет', result): | |
continue | |
for pattern in [patterns] if isinstance(patterns, str) else patterns: | |
for res in re.findall(pattern, result, re.IGNORECASE): | |
for number in map(int, re.findall(r'[0-9]+', res)): | |
output[key] = max(number, output.get(key, 0)) | |
total = sum(v for k, v in output.items() if k != 'total') | |
output['total'] = max(total, output.get('total', 0)) | |
return output | |
def process_file(filename, prev_day): | |
with open(filename) as fd: | |
payload = fd.read() | |
payload = html.handle(payload) | |
d = find_date(payload) | |
results = find_results(payload) | |
if results: | |
output = process_results(results, prev_day) | |
return d, output | |
def process_directory(): | |
data_dir, count = Path('./data'), 0 | |
# Инициализация для заполнения пустых на первых днях значений нулями и нормальной записи CSV | |
prev_day = {key: 0 for key in parse_tree.keys()} | |
global_output = dict() | |
for filename in sorted(data_dir.iterdir()): | |
res = process_file(filename, prev_day) | |
if res: | |
date, _prev_day = res | |
if _prev_day: | |
prev_day = _prev_day | |
day = date.split('_')[0] | |
global_output[day] = dict(prev_day) | |
count += 1 | |
return global_output | |
def download_data(): | |
host = 'https://z.mil.ru/' | |
resp = requests.get(f'{host}/spec_mil_oper/brief/briefings.htm?objInBlock=200') | |
soup = BeautifulSoup(resp.text) | |
urls = [f'{host}{a.attrs["href"]}' for a in soup.find_all('a') if 'Брифинг' in a.text and 'Минобороны' in a.text] | |
logging.info("Страница открыта") | |
for n, url in enumerate(urls): | |
logging.info("Обрабатываем URL %s (%d/%d)", url, n, len(urls)) | |
q = urlparse(url).query | |
page_id = q.split('=')[1].split('@')[0] | |
filename = f'data/{page_id}.html' | |
if os.path.exists(filename): | |
continue | |
r = requests.get(url) | |
with open(filename, 'wb') as fd: | |
fd.write(r.content) | |
def write_results(results): | |
with open('output.csv', 'w') as fd: | |
fieldnames = ['date', *parse_tree.keys()] | |
writer = csv.DictWriter(fd, fieldnames=fieldnames) | |
writer.writeheader() | |
for date, result in results.items(): | |
row = dict(result) | |
row['date'] = date | |
writer.writerow(row) | |
if __name__ == '__main__': | |
download_data() | |
write_results(process_directory()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Не знаю зачем кому-то может информация на выходе понадобится, точной и достоверной она тоже не является, алгоритм на регулярках - неидеальный, сделано из праздного любопытства, сохранено как пример разбора нестрогого формата отчётов на человеческом языке с меняющимися формулировками.
Вообще хотелось сделать это максимально компактным. По хорошему группировать бы схожие регулярки, но почему-то формат
r"^(A|B)common_part"
в питоне работает неожиданным для меня образом, отличающимся от egrep.