Added core module
This commit is contained in:
parent
ba976408e0
commit
9745a56b44
98
hc_spider/core.py
Normal file
98
hc_spider/core.py
Normal file
@ -0,0 +1,98 @@
|
||||
import multiprocessing
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from hc_spider import common
|
||||
from hc_spider.controller import Controller
|
||||
from hc_spider.json_logger import JSONLogger
|
||||
from hc_spider.timer import Timer
|
||||
from hc_spider.watchdog import Watchdog
|
||||
from hc_spider.worker import Worker
|
||||
|
||||
|
||||
class HCSpider(threading.Thread):
|
||||
_config: dict
|
||||
_manager: multiprocessing.Manager
|
||||
_workers: list[multiprocessing.Process]
|
||||
_worker_count: int
|
||||
_controller: threading.Thread
|
||||
_logger: threading.Thread
|
||||
_timer: threading.Thread
|
||||
_watchdog: threading.Thread
|
||||
_shared_objects: dict
|
||||
_components: list[threading.Thread | list[threading.Thread]]
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.daemon = True
|
||||
self.name = "Core"
|
||||
self._config = common.load_config()
|
||||
self._worker_count = self._config.get("workers")
|
||||
self._worker_count = self._worker_count if self._worker_count > 0 else multiprocessing.cpu_count() - 1
|
||||
self._manager = multiprocessing.Manager()
|
||||
self._shared_objects = {
|
||||
"job_queue": multiprocessing.Queue(-1),
|
||||
"log_queue": multiprocessing.Queue(-1),
|
||||
"shutdown_event": multiprocessing.Event(),
|
||||
"watchdog_event": multiprocessing.Event(),
|
||||
"worker_finished": multiprocessing.Barrier(self._worker_count + 1), # +1 for the logger
|
||||
"visited_nodes": self._manager.dict(),
|
||||
"not_visited_nodes": self._manager.dict(),
|
||||
"not_valid_urls": self._manager.list(),
|
||||
"urls_with_error": self._manager.list(),
|
||||
"config": common.load_config(),
|
||||
"lock": multiprocessing.Lock()
|
||||
}
|
||||
self._load_components()
|
||||
|
||||
def _load_components(self) -> None:
|
||||
self._components = [
|
||||
[Worker(**self._shared_objects) for _ in range(self._worker_count)],
|
||||
Controller(**self._shared_objects),
|
||||
JSONLogger(**self._shared_objects),
|
||||
Timer(**self._shared_objects),
|
||||
Watchdog(**self._shared_objects)
|
||||
]
|
||||
|
||||
def _component_handler(self, components: list[threading.Thread | list[threading.Thread]], action: str) -> None:
|
||||
for component in components:
|
||||
if isinstance(component, list) is True:
|
||||
match action:
|
||||
case "start":
|
||||
self._component_handler(components=component, action="start")
|
||||
case "join":
|
||||
self._component_handler(components=component, action="join")
|
||||
else:
|
||||
match action:
|
||||
case "start":
|
||||
component.start()
|
||||
case "join":
|
||||
component.join()
|
||||
|
||||
def _shutdown_handler(self) -> None:
|
||||
if self._shared_objects.get("watchdog_event").is_set() is False:
|
||||
self._shutdown_watchdog()
|
||||
|
||||
self._component_handler(components=self._components, action="join")
|
||||
|
||||
def _shutdown_watchdog(self) -> None:
|
||||
with open(self._shared_objects.get("config").get("watchdog_file"), "w") as f:
|
||||
f.write("exit")
|
||||
|
||||
def run(self) -> None:
|
||||
print(f"{self.name} started with pid [{os.getpid()}]")
|
||||
self._component_handler(components=self._components, action="start")
|
||||
|
||||
while self._shared_objects.get("shutdown_event").is_set() is False:
|
||||
time.sleep(1)
|
||||
|
||||
print(f"[{self.name}] is shutting down", flush=True)
|
||||
self._shutdown_handler()
|
||||
print(f"[{self.name}] Workers visited {len(self._shared_objects.get('visited_nodes'))} urls", flush=True)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._shared_objects.get("shutdown_event").set()
|
||||
|
||||
def __del__(self) -> None:
|
||||
print(f"[{self.name}] exited", flush=True)
|
Loading…
x
Reference in New Issue
Block a user