jitenbot/scraper.py

104 lines
3.4 KiB
Python
Raw Normal View History

import time
import requests
import re
import os
import json
import hashlib
from pathlib import Path
from urllib.parse import urlparse
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from datetime import datetime
class Scraper():
def __init__(self):
self.netloc_re = \
re.compile(r"^(?:([A-Za-z0-9.\-]+)\.)?" + self.domain + r"$")
self.__set_session()
def scrape(self, urlstring):
url = urlparse(urlstring, scheme='https://', allow_fragments=True)
self.__validate_url(url)
cache_path = self.__cache_path(url)
cache_contents = self.__read_cache(cache_path)
if cache_contents is not None:
return cache_contents
html = self.__get(urlstring)
with open(cache_path, "w") as f:
f.write(html)
return html
def __set_session(self):
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
with open("config.json", "r") as f:
config = json.load(f)
headers = config["http-request-headers"]
self.session = requests.Session()
self.session.mount("https://", adapter)
self.session.headers.update(headers)
def __validate_url(self, url):
valid = False
if self.netloc_re.match(url.netloc):
valid = True
# may add more validators later
if not valid:
raise Exception(f"Invalid URL: {url.geturl()}")
def __cache_path(self, url):
cache_dir = os.path.join("webcache", self.__class__.__name__.lower())
netloc_match = self.netloc_re.match(url.netloc)
if netloc_match.group(1) is not None:
subdomain = netloc_match.group(1)
cache_dir = os.path.join(cache_dir, subdomain)
paths = re.findall(r"/([^/]+)", url.path)
if len(paths) < 1:
raise Exception(f"Invalid path in URL: {url.geturl()}")
for x in paths[:len(paths)-1]:
cache_dir = os.path.join(cache_dir, x)
if not Path(cache_dir).is_dir():
os.makedirs(cache_dir)
basename = paths[-1].replace(".", "_")
urlstring_hash = hashlib.md5(url.geturl().encode()).hexdigest()
filename = f"{basename}-{urlstring_hash}.html"
cache_path = os.path.join(cache_dir, filename)
return cache_path
def __read_cache(self, cache_path):
if Path(cache_path).is_file():
with open(cache_path, "r") as f:
file_contents = f.read()
else:
file_contents = None
return file_contents
def __get(self, url):
delay = 10
time.sleep(delay)
now = datetime.now().strftime("%H:%M:%S")
print(f"{now} scraping {url.geturl()} ...", end='')
try:
response = self.session.get(url, timeout=10)
print("OK")
return response.text
except Exception:
print("failed")
print("resetting session and trying again")
self.__set_session()
response = self.session.get(url, timeout=10)
return response.text
class Jitenon(Scraper):
def __init__(self):
self.domain = r"jitenon\.jp"
Scraper.__init__(self)