From 73f7593d07d4c107204374b827684b22ce5bb599 Mon Sep 17 00:00:00 2001 From: Tamas Szirtesi Date: Mon, 9 Oct 2023 14:23:45 +0200 Subject: [PATCH] Added json logger module --- hc_spider/json_logger.py | 81 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 hc_spider/json_logger.py diff --git a/hc_spider/json_logger.py b/hc_spider/json_logger.py new file mode 100644 index 0000000..417cce3 --- /dev/null +++ b/hc_spider/json_logger.py @@ -0,0 +1,81 @@ +import json +import os +import threading +import time +import warnings +from typing import Callable, Union + +from hc_spider.model import SharedObjects + + +class JSONLogger(threading.Thread): + logging_interval: int + _shared_objects: SharedObjects + + def __init__(self, **kwargs) -> None: + self._shared_objects = SharedObjects(**kwargs) + self.logging_interval = self._shared_objects.config.get("logging_interval") + super().__init__() + self.daemon = True + self.name = "JSONLogger" + + def start(self) -> None: + if os.path.exists(self._shared_objects.config.get("log_dir")) is False: + os.mkdir(self._shared_objects.config.get("log_dir")) + super().start() + + def run(self) -> None: + print(f"[{self.name}] is starting") + + while self._shared_objects.shutdown_event.is_set() is False: + time.sleep(self.logging_interval) + try: + self._dump_logs() + + except RuntimeError as e: + print(f"[{self.name}] RuntimeError: {e}, continuing execution") + continue + + print(f"[{self.name}] is shutting down", flush=True) + self._shutdown() + + def _shutdown(self) -> None: + try: + self._shared_objects.worker_finished.wait(timeout=10) + + except threading.BrokenBarrierError: + warnings.warn(f"{self.name}: Workers are still running, log files can be corrupted") + + self._dump_logs() + + def _dump_logs(self) -> None: + self._dump_not_valid_urls() + self._dump_otc_related_urls() + self._dump_urls_with_error() + if self._shared_objects.shutdown_event.is_set() is True: + self._dump_visited() + + def _dump_data_to_file(self, data: list | dict, filename: str, sort_key: Union[Callable, None]) -> None: + if self._shared_objects.shutdown_event.is_set() is True: + data.sort(key=sort_key) + with open(f"{self._shared_objects.config.get('log_dir')}/{filename}.json", "w") as f: + f.write(json.dumps(data, indent=4)) + + def _dump_not_valid_urls(self) -> None: + data = list(self._shared_objects.not_valid_urls) + self._dump_data_to_file(data=data, filename="not_valid_urls", sort_key=lambda x: x.get("url")) + + def _dump_visited(self) -> None: + data = list(self._shared_objects.visited_nodes) + self._dump_data_to_file(data=data, filename="visited_urls", sort_key=None) + + def _dump_otc_related_urls(self) -> None: + data = [i for i in self._shared_objects.not_valid_urls if "otc" in i.get("url", "")] + self._dump_data_to_file(data=data, filename="otc_not_valid_urls", sort_key=lambda x: x.get("url")) + + def _dump_urls_with_error(self) -> None: + data = list(self._shared_objects.urls_with_error) + self._dump_data_to_file(data=data, filename="urls_with_error", sort_key=lambda x: x.get("error").split()[0]) + + def __del__(self) -> None: + print(f"[{self.name}] exited", flush=True)