diff --git a/hc_spider/core.py b/hc_spider/core.py new file mode 100644 index 0000000..5a7c994 --- /dev/null +++ b/hc_spider/core.py @@ -0,0 +1,98 @@ +import multiprocessing +import os +import threading +import time + +from hc_spider import common +from hc_spider.controller import Controller +from hc_spider.json_logger import JSONLogger +from hc_spider.timer import Timer +from hc_spider.watchdog import Watchdog +from hc_spider.worker import Worker + + +class HCSpider(threading.Thread): + _config: dict + _manager: multiprocessing.Manager + _workers: list[multiprocessing.Process] + _worker_count: int + _controller: threading.Thread + _logger: threading.Thread + _timer: threading.Thread + _watchdog: threading.Thread + _shared_objects: dict + _components: list[threading.Thread | list[threading.Thread]] + + def __init__(self) -> None: + super().__init__() + self.daemon = True + self.name = "Core" + self._config = common.load_config() + self._worker_count = self._config.get("workers") + self._worker_count = self._worker_count if self._worker_count > 0 else multiprocessing.cpu_count() - 1 + self._manager = multiprocessing.Manager() + self._shared_objects = { + "job_queue": multiprocessing.Queue(-1), + "log_queue": multiprocessing.Queue(-1), + "shutdown_event": multiprocessing.Event(), + "watchdog_event": multiprocessing.Event(), + "worker_finished": multiprocessing.Barrier(self._worker_count + 1), # +1 for the logger + "visited_nodes": self._manager.dict(), + "not_visited_nodes": self._manager.dict(), + "not_valid_urls": self._manager.list(), + "urls_with_error": self._manager.list(), + "config": common.load_config(), + "lock": multiprocessing.Lock() + } + self._load_components() + + def _load_components(self) -> None: + self._components = [ + [Worker(**self._shared_objects) for _ in range(self._worker_count)], + Controller(**self._shared_objects), + JSONLogger(**self._shared_objects), + Timer(**self._shared_objects), + Watchdog(**self._shared_objects) + ] + + def _component_handler(self, components: list[threading.Thread | list[threading.Thread]], action: str) -> None: + for component in components: + if isinstance(component, list) is True: + match action: + case "start": + self._component_handler(components=component, action="start") + case "join": + self._component_handler(components=component, action="join") + else: + match action: + case "start": + component.start() + case "join": + component.join() + + def _shutdown_handler(self) -> None: + if self._shared_objects.get("watchdog_event").is_set() is False: + self._shutdown_watchdog() + + self._component_handler(components=self._components, action="join") + + def _shutdown_watchdog(self) -> None: + with open(self._shared_objects.get("config").get("watchdog_file"), "w") as f: + f.write("exit") + + def run(self) -> None: + print(f"{self.name} started with pid [{os.getpid()}]") + self._component_handler(components=self._components, action="start") + + while self._shared_objects.get("shutdown_event").is_set() is False: + time.sleep(1) + + print(f"[{self.name}] is shutting down", flush=True) + self._shutdown_handler() + print(f"[{self.name}] Workers visited {len(self._shared_objects.get('visited_nodes'))} urls", flush=True) + + def stop(self) -> None: + self._shared_objects.get("shutdown_event").set() + + def __del__(self) -> None: + print(f"[{self.name}] exited", flush=True)