from fastapi import HTTPException import requests import re from bs4 import BeautifulSoup import os import json from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor, as_completed def _get_proxies() -> dict: """Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY.""" proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or "" if not proxy: return {} return {"http": proxy, "https": proxy} class ETSIDocFinder: HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} def __init__(self): self.main_ftp_url = "https://docbox.etsi.org/SET" req_data = self.connect() print(req_data['message']) self.session = req_data['session'] def connect(self): session = requests.Session() session.headers.update(self.HEADERS) session.proxies.update(_get_proxies()) # Seed DNN session cookies — docbox requires the portal session to be # initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie # is scoped to .etsi.org and accepted by docbox.etsi.org as well. login_redir_url = ( "https://portal.etsi.org/LoginRedirection.aspx" "?domain=docbox.etsi.org&ReturnUrl=/" ) session.get(login_redir_url, verify=False, timeout=15) req = session.post( "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}), headers={"Content-Type": "application/json; charset=UTF-8", "Referer": login_redir_url}, verify=False, allow_redirects=False, timeout=15, ) if req.text == "Failed": return {"error": True, "session": session, "message": "Login failed ! Check your credentials"} # Always update self.session so reconnect and reauth actually take effect self.session = session return {"error": False, "session": session, "message": "Login successful"} def download_document(self, url: str) -> bytes: """Download a docbox file using the authenticated session. If the session has expired the portal redirects to LoginRedirection — we detect this and re-authenticate before retrying. """ resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) # Detect auth redirect (portal login page returned instead of file) if resp.url and "LoginRedirection" in resp.url: self.connect() # connect() now updates self.session resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) return resp.content def get_workgroup(self, doc: str): main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None if main_tsg is None: return None, None, None regex = re.search(r'\(([^)]+)\)', doc) workgroup = "20" + regex.group(1) return main_tsg, workgroup, doc def find_workgroup_url(self, main_tsg, workgroup): url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS" response = self.session.get(url, verify=False, timeout=15) # If docbox redirected to the portal login page, reauth and retry once if "LoginRedirection" in response.url: self.connect() response = self.session.get(url, verify=False, timeout=15) soup = BeautifulSoup(response.text, 'html.parser') for item in soup.find_all("tr"): link = item.find("a") if link and workgroup in link.get_text(): return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" def get_docs_from_url(self, url): try: response = self.session.get(url, verify=False, timeout=15) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: print(f"Error accessing {url}: {e}") return [] def search_document(self, doc_id: str): original = doc_id main_tsg, workgroup, doc = self.get_workgroup(doc_id) urls = [] if main_tsg: wg_url = self.find_workgroup_url(main_tsg, workgroup) print(wg_url) if wg_url: entries = self.get_docs_from_url(wg_url) print(entries) for entry in entries: if doc in entry.lower() or original in entry: doc_url = f"{wg_url}/{entry}" urls.append(doc_url) elif "." not in entry.rstrip("/"): # looks like a subdirectory — go one level deeper sub_url = f"{wg_url}/{entry}" files = self.get_docs_from_url(sub_url) for f in files: if doc in f.lower() or original in f: print(f) urls.append(f"{sub_url}/{f}") return urls[0] if len(urls) == 1 else urls[-1] if len(urls) > 1 else f"Document {doc_id} not found" class ETSISpecFinder: def __init__(self): self.main_url = "https://www.etsi.org/deliver/etsi_ts" self.second_url = "https://www.etsi.org/deliver/etsi_tr" self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} def get_spec_path(self, doc_id: str): if "-" in doc_id: position, part = doc_id.split("-") else: position, part = doc_id, None position = position.replace(" ", "") if part: if len(part) == 1: part = "0" + part spec_folder = position + part if part is not None else position return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}" def get_docs_from_url(self, url): try: response = requests.get(url, verify=False, timeout=15, proxies=_get_proxies()) soup = BeautifulSoup(response.text, "html.parser") docs = [item.get_text() for item in soup.find_all("a")][1:] return docs except Exception as e: print(f"Error accessing {url}: {e}") return [] def _normalise_version(self, version: str) -> str: """Normalise a user-supplied version string to ETSI zero-padded format. '17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching) Already-normalised strings like '17.06.00' are returned unchanged.""" parts = version.strip("/").split(".") if len(parts) == 3: try: return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}" except ValueError: pass return version.strip("/") def _pick_release(self, releases: list, version: str = None) -> str: """Return the release folder matching version, or the latest if not found/specified.""" if version: target = self._normalise_version(version) for r in releases: # folder names are like '17.06.00_60'; match on the part before '_' folder = r.strip("/").split("_")[0] if folder == target: return r return releases[-1] def search_document(self, doc_id: str, version: str = None): # Example : 103 666[-2 opt] original = doc_id url = f"{self.main_url}/{self.get_spec_path(original)}/" url2 = f"{self.second_url}/{self.get_spec_path(original)}/" print(url) print(url2) releases = self.get_docs_from_url(url) if releases: release = self._pick_release(releases, version) files = self.get_docs_from_url(url + release) for f in files: if f.endswith(".pdf"): return url + release + "/" + f releases = self.get_docs_from_url(url2) if releases: release = self._pick_release(releases, version) files = self.get_docs_from_url(url2 + release) for f in files: if f.endswith(".pdf"): return url2 + release + "/" + f return f"Specification {doc_id} not found" def _get_wki_id_candidates(self, doc_id: str, version: str = None) -> list: """Return a list of candidate wki_ids for a spec version (best match first).""" if version: version_str = version else: # Derive version from the FTP PDF URL pdf_url = self.search_document(doc_id) if "not found" in pdf_url.lower(): return [] parts = pdf_url.rstrip("/").split("/") version_folder = parts[-2] # e.g. "18.04.00_60" v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"] try: version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}" except (ValueError, IndexError): return [] def fetch_for_type(spec_type): params = { "option": "com_standardssearch", "view": "data", "format": "json", "page": "1", "search": f"ETSI {spec_type} {doc_id} v{version_str}", "etsiNumber": "1", "published": "1", } try: resp = requests.get("https://www.etsi.org/", params=params, headers=self.headers, verify=False, timeout=15, proxies=_get_proxies()) data = resp.json() if data and isinstance(data, list): return [str(item["wki_id"]) for item in data if "wki_id" in item] except Exception as e: print(f"Error getting wki_id for {doc_id}: {e}") return [] candidates = [] with ThreadPoolExecutor(max_workers=2) as executor: for result in executor.map(fetch_for_type, ["TS", "TR"]): candidates.extend(result) return candidates def _authenticate_eol(self, wki_id: str) -> requests.Session: """Create a requests.Session authenticated to the ETSI EOL portal.""" session = requests.Session() session.headers.update({"User-Agent": self.headers["User-Agent"]}) session.proxies.update(_get_proxies()) login_redir_url = ( f"https://portal.etsi.org/LoginRedirection.aspx" f"?ReturnUrl=%2fwebapp%2fprotect%2fNTaccount.asp%3fWki_Id%3d{wki_id}" f"&Wki_Id={wki_id}" ) # Seed DNN session cookies session.get(login_redir_url, verify=False, timeout=15) # Authenticate via EOL JSON login session.post( "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}), headers={"Content-Type": "application/json; charset=UTF-8", "Referer": login_redir_url}, verify=False, allow_redirects=False, timeout=15, ) return session def search_document_docx(self, doc_id: str, version: str = None) -> str: """Download an ETSI spec as DOCX and return the local file path.""" candidates = self._get_wki_id_candidates(doc_id, version) if not candidates: return f"Specification {doc_id} not found" # Authenticate once — cookies are auth tokens, not wki_id-specific auth_session = self._authenticate_eol(candidates[0]) def try_wki(wki_id): print(f"Trying wki_id={wki_id} for {doc_id}") # Each thread gets its own session pre-loaded with the shared auth cookies session = requests.Session() session.headers.update({"User-Agent": self.headers["User-Agent"]}) session.proxies.update(_get_proxies()) session.cookies.update(auth_session.cookies) r = session.get( f"https://portal.etsi.org/webapp/protect/NTaccount.asp?Wki_Id={wki_id}", verify=False, timeout=15, ) meta_match = re.search(r'URL=([^"\'\s>]+)', r.text) if not meta_match: print(f" wki_id={wki_id}: authentication failed, trying next") return None meta_url = meta_match.group(1) if not meta_url.startswith("http"): meta_url = f"https://portal.etsi.org/webapp/protect/{meta_url}" r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15) if r2.status_code != 302: print(f" wki_id={wki_id}: unexpected status {r2.status_code}, trying next") return None location2 = r2.headers.get("Location", "") if "processError" in location2 or "processErrors" in location2: print(f" wki_id={wki_id}: portal rejected ({location2}), trying next") return None copy_url = urljoin("https://portal.etsi.org/", location2) r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15) if r3.status_code == 302: location3 = r3.headers.get("Location", "") final_url = urljoin("https://portal.etsi.org/webapp/ewp/", location3) r4 = session.get(final_url, verify=False, timeout=15) else: r4 = r3 docx_urls = re.findall(r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE) if not docx_urls: print(f" wki_id={wki_id}: DOCX not found in page, trying next") return None spec_num = doc_id.split("-")[0].replace(" ", "") matching_urls = [u for u in docx_urls if spec_num in u.split("/")[-1]] if not matching_urls: print(f" wki_id={wki_id}: DOCX spec mismatch (expected {spec_num}), trying next") return None docx_url = matching_urls[0] dl = session.get(docx_url, headers={"Referer": r4.url}, verify=False, timeout=60) filename = docx_url.split("/")[-1] tmp_path = f"/tmp/{filename}" with open(tmp_path, "wb") as f: f.write(dl.content) print(f" wki_id={wki_id}: success") return tmp_path with ThreadPoolExecutor(max_workers=min(len(candidates), 4)) as executor: future_to_wki = {executor.submit(try_wki, wki_id): wki_id for wki_id in candidates} for future in as_completed(future_to_wki): result = future.result() if result is not None: return result return f"Specification {doc_id}: all {len(candidates)} wki_id candidate(s) rejected by ETSI portal"