134 lines
4.1 KiB
Python
134 lines
4.1 KiB
Python
import re
|
|
import urllib.parse
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
from hc_spider.abstract import ISpider
|
|
from hc_spider.model import Response, ResponseError
|
|
|
|
|
|
class ScrapySpider(ISpider):
|
|
def get_links(self, url: str) -> Response | ResponseError:
|
|
pass
|
|
|
|
|
|
class BeautifulSoupSpiderBase(ISpider):
|
|
_response: requests.Response
|
|
_raw_urls: set[str]
|
|
_scraper_url: str
|
|
|
|
def _get_raw_page(self) -> requests.Response | ResponseError:
|
|
try:
|
|
resp = requests.get(url=self._scraper_url, timeout=2)
|
|
resp.raise_for_status()
|
|
|
|
except Exception as e:
|
|
return ResponseError(exc=e)
|
|
|
|
return resp
|
|
|
|
def _load_raw_urls(self) -> None:
|
|
soup = BeautifulSoup(self._response.content, "lxml")
|
|
self._raw_urls = set([i.get("href") for i in soup.find_all('a', href=True)])
|
|
|
|
def get_links(self, url: str) -> Response | ResponseError:
|
|
self._scraper_url = url
|
|
resp = self._get_raw_page()
|
|
|
|
if isinstance(resp, ResponseError) is True:
|
|
return resp
|
|
self._response = resp
|
|
self._load_raw_urls()
|
|
|
|
return Response(links=self._raw_urls)
|
|
|
|
|
|
class BeautifulSoupScraper(BeautifulSoupSpiderBase):
|
|
_scraper_url: str
|
|
_absolute_urls: set
|
|
_relative_urls: set
|
|
|
|
def get_links(self, url: str) -> Response | ResponseError:
|
|
self._scraper_url = url
|
|
|
|
if isinstance((resp := super().get_links(url=url)), ResponseError):
|
|
return resp
|
|
|
|
if not resp.links:
|
|
return resp
|
|
|
|
self._process_response()
|
|
|
|
return Response(links=set.union(self._absolute_urls, self._relative_urls))
|
|
|
|
def _clean_url_list(self) -> None:
|
|
result = [url for url in self._raw_urls if
|
|
# any(re.findall(r'api-ref|operation_guide|umn|#|mailto', url, re.IGNORECASE))]
|
|
any(re.findall(r'#|mailto|API%20Documents', url, re.IGNORECASE))]
|
|
|
|
self._raw_urls = self._raw_urls - set(result)
|
|
|
|
def _process_absolute_urls(self) -> None:
|
|
self._absolute_urls = {i for i in self._raw_urls if i.startswith("https://") or i.startswith("http://")}
|
|
|
|
def _process_relative_urls(self) -> None:
|
|
base_url: str
|
|
final_url: str
|
|
results = set()
|
|
|
|
if not self._raw_urls:
|
|
self._relative_urls = set()
|
|
return
|
|
|
|
# If the original url ends with .html e.g.: https://docs.otc.t-systems.com/developer/api.html
|
|
if self._scraper_url.endswith(".html") is True:
|
|
# Removing /xy.html part
|
|
base_url = self._scraper_url.rsplit("/", 1)[0]
|
|
|
|
elif self._scraper_url.endswith("/") is True:
|
|
base_url = self._scraper_url.rstrip("/")
|
|
|
|
else:
|
|
base_url = self._scraper_url
|
|
|
|
# Creating an url parse instance
|
|
p = urllib.parse.urlparse(base_url)
|
|
|
|
for url in self._raw_urls:
|
|
# If we found a relative path in the url e.g.: href="../services.html">Services</a>
|
|
# we should resolve the absolute path
|
|
if url.startswith(".."):
|
|
res = list()
|
|
complete_url = f"{base_url}/{url}"
|
|
# https://example.com/1/2/../../index.html
|
|
for i in complete_url.split("/"):
|
|
# ['https:', '', 'example.com', '1', '2', '..', '..', 'index.html']
|
|
if i and i != "..":
|
|
res.append(i)
|
|
else:
|
|
res.pop()
|
|
# res = ['example.com', 'index.html']
|
|
final_url = f"{p.scheme}://{'/'.join(res)}"
|
|
|
|
elif url.startswith("/") is True:
|
|
final_url = f"{self._shared_objects.config.get('otc_base_url')}{url}"
|
|
|
|
else:
|
|
final_url = f"{base_url}/{url}"
|
|
|
|
results.add(final_url)
|
|
|
|
self._relative_urls = results
|
|
|
|
def _process_response(self) -> None:
|
|
self._clean_url_list()
|
|
self._process_absolute_urls()
|
|
self._raw_urls = self._raw_urls - self._absolute_urls
|
|
self._process_relative_urls()
|
|
|
|
|
|
class RequestsSpider(ISpider):
|
|
def get_links(self, url: str) -> Response | ResponseError:
|
|
pass
|