diff --git a/hc_spider/worker.py b/hc_spider/worker.py new file mode 100644 index 0000000..1336df3 --- /dev/null +++ b/hc_spider/worker.py @@ -0,0 +1,123 @@ +import copy +import multiprocessing +import os +import queue +import threading +import urllib.parse + +import requests + +from hc_spider.model import SharedObjects, ResponseError +from hc_spider.spider import BeautifulSoupScraper + + +class URLProcessor: + _job_queue: multiprocessing.Queue + _shared_objects: SharedObjects + _blacklisted_urls: list + _blacklisted_domains: list + _otc_base_url: str + _scraper: BeautifulSoupScraper + _current_url: str + _response: requests.Response | ResponseError + + def __init__(self, **kwargs) -> None: + self._shared_objects = SharedObjects(**kwargs) + self._blacklisted_urls = copy.deepcopy(self._shared_objects.config.get("blacklisted_urls")) + self._blacklisted_domains = copy.deepcopy(self._shared_objects.config.get("blacklisted_domains")) + self._otc_base_url = copy.deepcopy(self._shared_objects.config.get("otc_base_url")) + self._scraper = BeautifulSoupScraper(shared_objects=self._shared_objects) + + def _process_children(self) -> None: + for new_url in self._response.links: + if new_url in self._blacklisted_urls: + continue + + p = urllib.parse.urlparse(new_url) + if p.hostname in self._blacklisted_domains: + continue + + if new_url.startswith(self._otc_base_url) is False: + self._shared_objects.not_valid_urls.append({ + "url": new_url, + "orig": self._current_url + }) + continue + + if new_url in self._shared_objects.visited_nodes: + continue + + if new_url in self._shared_objects.not_visited_nodes: + continue + + with self._shared_objects.lock: + if new_url in self._shared_objects.not_visited_nodes: + continue + self._shared_objects.not_visited_nodes[new_url] = self._current_url + self._shared_objects.job_queue.put(new_url) + + def process(self, url: str) -> None: + self._current_url = url + if self._current_url in self._shared_objects.visited_nodes: + return + + self._response = self._scraper.get_links(url=self._current_url) + if isinstance(self._response, ResponseError): + self._shared_objects.urls_with_error.append({ + "url": self._current_url, + "orig": self._shared_objects.not_visited_nodes.get(self._current_url), + "error": f"{self._response.exc}" + }) + + self._shared_objects.visited_nodes[self._current_url] = True + return + + if not self._response.links: + return + + self._process_children() + + +class Worker(multiprocessing.Process): + _shutdown_event: threading.Event + _job_queue: multiprocessing.Queue + _shared_objects: SharedObjects + _blacklisted_urls: list + _blacklisted_domains: list + _otc_base_url: str + _current_url: str + _url_processor: URLProcessor + + def __init__(self, **kwargs) -> None: + self._shared_objects = SharedObjects(**kwargs) + self._blacklisted_urls = self._shared_objects.config.get("blacklisted_urls", []) + self._blacklisted_domains = self._shared_objects.config.get("blacklisted_domains", []) + self._otc_base_url = self._shared_objects.config.get("otc_base_url", "") + self._url_processor = URLProcessor(**kwargs) + super().__init__() + self.daemon = False + + def start(self) -> None: + print(f"[{self.name}] is starting") + super().start() + + def run(self) -> None: + print(f"{self.name} started with pid [{os.getpid()}]") + while self._shared_objects.shutdown_event.is_set() is False: + try: + self._current_url = self._shared_objects.job_queue.get(block=False) + + except queue.Empty: + continue + + print(f"[{self.name}] pid [{os.getpid()}] processing {self._current_url}") + self._url_processor.process(url=self._current_url) + self._shared_objects.visited_nodes[self._current_url] = True + self._shared_objects.not_visited_nodes.pop(self._current_url, None) + + print(f"[{self.name}] is shutting down", flush=True) + self._shared_objects.job_queue.cancel_join_thread() + self._shared_objects.worker_finished.wait(timeout=10) + + def __del__(self) -> None: + print(f"[{self.name}] exited", flush=True)