DocFinder / classes.py
heymenn's picture
add parallelization for downloads
1cb47b6
from fastapi import HTTPException
import requests
import re
from bs4 import BeautifulSoup
import os
import json
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
def _get_proxies() -> dict:
"""Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY."""
proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or ""
if not proxy:
return {}
return {"http": proxy, "https": proxy}
class ETSIDocFinder:
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def __init__(self):
self.main_ftp_url = "https://docbox.etsi.org/SET"
req_data = self.connect()
print(req_data['message'])
self.session = req_data['session']
def connect(self):
session = requests.Session()
session.headers.update(self.HEADERS)
session.proxies.update(_get_proxies())
# Seed DNN session cookies — docbox requires the portal session to be
# initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie
# is scoped to .etsi.org and accepted by docbox.etsi.org as well.
login_redir_url = (
"https://portal.etsi.org/LoginRedirection.aspx"
"?domain=docbox.etsi.org&ReturnUrl=/"
)
session.get(login_redir_url, verify=False, timeout=15)
req = session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": os.environ.get("EOL_USER"),
"password": os.environ.get("EOL_PASSWORD")}),
headers={"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url},
verify=False,
allow_redirects=False,
timeout=15,
)
if req.text == "Failed":
return {"error": True, "session": session, "message": "Login failed ! Check your credentials"}
# Always update self.session so reconnect and reauth actually take effect
self.session = session
return {"error": False, "session": session, "message": "Login successful"}
def download_document(self, url: str) -> bytes:
"""Download a docbox file using the authenticated session.
If the session has expired the portal redirects to LoginRedirection —
we detect this and re-authenticate before retrying.
"""
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
# Detect auth redirect (portal login page returned instead of file)
if resp.url and "LoginRedirection" in resp.url:
self.connect() # connect() now updates self.session
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
return resp.content
def get_workgroup(self, doc: str):
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None
if main_tsg is None:
return None, None, None
regex = re.search(r'\(([^)]+)\)', doc)
workgroup = "20" + regex.group(1)
return main_tsg, workgroup, doc
def find_workgroup_url(self, main_tsg, workgroup):
url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS"
response = self.session.get(url, verify=False, timeout=15)
# If docbox redirected to the portal login page, reauth and retry once
if "LoginRedirection" in response.url:
self.connect()
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, 'html.parser')
for item in soup.find_all("tr"):
link = item.find("a")
if link and workgroup in link.get_text():
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}"
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}"
def get_docs_from_url(self, url):
try:
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
original = doc_id
main_tsg, workgroup, doc = self.get_workgroup(doc_id)
urls = []
if main_tsg:
wg_url = self.find_workgroup_url(main_tsg, workgroup)
print(wg_url)
if wg_url:
entries = self.get_docs_from_url(wg_url)
print(entries)
for entry in entries:
if doc in entry.lower() or original in entry:
doc_url = f"{wg_url}/{entry}"
urls.append(doc_url)
elif "." not in entry.rstrip("/"):
# looks like a subdirectory — go one level deeper
sub_url = f"{wg_url}/{entry}"
files = self.get_docs_from_url(sub_url)
for f in files:
if doc in f.lower() or original in f:
print(f)
urls.append(f"{sub_url}/{f}")
return urls[0] if len(urls) == 1 else urls[-1] if len(urls) > 1 else f"Document {doc_id} not found"
class ETSISpecFinder:
def __init__(self):
self.main_url = "https://www.etsi.org/deliver/etsi_ts"
self.second_url = "https://www.etsi.org/deliver/etsi_tr"
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def get_spec_path(self, doc_id: str):
if "-" in doc_id:
position, part = doc_id.split("-")
else:
position, part = doc_id, None
position = position.replace(" ", "")
if part:
if len(part) == 1:
part = "0" + part
spec_folder = position + part if part is not None else position
return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}"
def get_docs_from_url(self, url):
try:
response = requests.get(url, verify=False, timeout=15, proxies=_get_proxies())
soup = BeautifulSoup(response.text, "html.parser")
docs = [item.get_text() for item in soup.find_all("a")][1:]
return docs
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def _normalise_version(self, version: str) -> str:
"""Normalise a user-supplied version string to ETSI zero-padded format.
'17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching)
Already-normalised strings like '17.06.00' are returned unchanged."""
parts = version.strip("/").split(".")
if len(parts) == 3:
try:
return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}"
except ValueError:
pass
return version.strip("/")
def _pick_release(self, releases: list, version: str = None) -> str:
"""Return the release folder matching version, or the latest if not found/specified."""
if version:
target = self._normalise_version(version)
for r in releases:
# folder names are like '17.06.00_60'; match on the part before '_'
folder = r.strip("/").split("_")[0]
if folder == target:
return r
return releases[-1]
def search_document(self, doc_id: str, version: str = None):
# Example : 103 666[-2 opt]
original = doc_id
url = f"{self.main_url}/{self.get_spec_path(original)}/"
url2 = f"{self.second_url}/{self.get_spec_path(original)}/"
print(url)
print(url2)
releases = self.get_docs_from_url(url)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url + release)
for f in files:
if f.endswith(".pdf"):
return url + release + "/" + f
releases = self.get_docs_from_url(url2)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url2 + release)
for f in files:
if f.endswith(".pdf"):
return url2 + release + "/" + f
return f"Specification {doc_id} not found"
def _get_wki_id_candidates(self, doc_id: str, version: str = None) -> list:
"""Return a list of candidate wki_ids for a spec version (best match first)."""
if version:
version_str = version
else:
# Derive version from the FTP PDF URL
pdf_url = self.search_document(doc_id)
if "not found" in pdf_url.lower():
return []
parts = pdf_url.rstrip("/").split("/")
version_folder = parts[-2] # e.g. "18.04.00_60"
v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"]
try:
version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}"
except (ValueError, IndexError):
return []
def fetch_for_type(spec_type):
params = {
"option": "com_standardssearch",
"view": "data",
"format": "json",
"page": "1",
"search": f"ETSI {spec_type} {doc_id} v{version_str}",
"etsiNumber": "1",
"published": "1",
}
try:
resp = requests.get("https://www.etsi.org/", params=params,
headers=self.headers, verify=False, timeout=15,
proxies=_get_proxies())
data = resp.json()
if data and isinstance(data, list):
return [str(item["wki_id"]) for item in data if "wki_id" in item]
except Exception as e:
print(f"Error getting wki_id for {doc_id}: {e}")
return []
candidates = []
with ThreadPoolExecutor(max_workers=2) as executor:
for result in executor.map(fetch_for_type, ["TS", "TR"]):
candidates.extend(result)
return candidates
def _authenticate_eol(self, wki_id: str) -> requests.Session:
"""Create a requests.Session authenticated to the ETSI EOL portal."""
session = requests.Session()
session.headers.update({"User-Agent": self.headers["User-Agent"]})
session.proxies.update(_get_proxies())
login_redir_url = (
f"https://portal.etsi.org/LoginRedirection.aspx"
f"?ReturnUrl=%2fwebapp%2fprotect%2fNTaccount.asp%3fWki_Id%3d{wki_id}"
f"&Wki_Id={wki_id}"
)
# Seed DNN session cookies
session.get(login_redir_url, verify=False, timeout=15)
# Authenticate via EOL JSON login
session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": os.environ.get("EOL_USER"),
"password": os.environ.get("EOL_PASSWORD")}),
headers={"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url},
verify=False,
allow_redirects=False,
timeout=15,
)
return session
def search_document_docx(self, doc_id: str, version: str = None) -> str:
"""Download an ETSI spec as DOCX and return the local file path."""
candidates = self._get_wki_id_candidates(doc_id, version)
if not candidates:
return f"Specification {doc_id} not found"
# Authenticate once — cookies are auth tokens, not wki_id-specific
auth_session = self._authenticate_eol(candidates[0])
def try_wki(wki_id):
print(f"Trying wki_id={wki_id} for {doc_id}")
# Each thread gets its own session pre-loaded with the shared auth cookies
session = requests.Session()
session.headers.update({"User-Agent": self.headers["User-Agent"]})
session.proxies.update(_get_proxies())
session.cookies.update(auth_session.cookies)
r = session.get(
f"https://portal.etsi.org/webapp/protect/NTaccount.asp?Wki_Id={wki_id}",
verify=False, timeout=15,
)
meta_match = re.search(r'URL=([^"\'\s>]+)', r.text)
if not meta_match:
print(f" wki_id={wki_id}: authentication failed, trying next")
return None
meta_url = meta_match.group(1)
if not meta_url.startswith("http"):
meta_url = f"https://portal.etsi.org/webapp/protect/{meta_url}"
r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15)
if r2.status_code != 302:
print(f" wki_id={wki_id}: unexpected status {r2.status_code}, trying next")
return None
location2 = r2.headers.get("Location", "")
if "processError" in location2 or "processErrors" in location2:
print(f" wki_id={wki_id}: portal rejected ({location2}), trying next")
return None
copy_url = urljoin("https://portal.etsi.org/", location2)
r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15)
if r3.status_code == 302:
location3 = r3.headers.get("Location", "")
final_url = urljoin("https://portal.etsi.org/webapp/ewp/", location3)
r4 = session.get(final_url, verify=False, timeout=15)
else:
r4 = r3
docx_urls = re.findall(r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE)
if not docx_urls:
print(f" wki_id={wki_id}: DOCX not found in page, trying next")
return None
spec_num = doc_id.split("-")[0].replace(" ", "")
matching_urls = [u for u in docx_urls if spec_num in u.split("/")[-1]]
if not matching_urls:
print(f" wki_id={wki_id}: DOCX spec mismatch (expected {spec_num}), trying next")
return None
docx_url = matching_urls[0]
dl = session.get(docx_url, headers={"Referer": r4.url}, verify=False, timeout=60)
filename = docx_url.split("/")[-1]
tmp_path = f"/tmp/{filename}"
with open(tmp_path, "wb") as f:
f.write(dl.content)
print(f" wki_id={wki_id}: success")
return tmp_path
with ThreadPoolExecutor(max_workers=min(len(candidates), 4)) as executor:
future_to_wki = {executor.submit(try_wki, wki_id): wki_id for wki_id in candidates}
for future in as_completed(future_to_wki):
result = future.result()
if result is not None:
return result
return f"Specification {doc_id}: all {len(candidates)} wki_id candidate(s) rejected by ETSI portal"