Spaces:
Sleeping
Sleeping
| from fastapi import HTTPException | |
| import requests | |
| import re | |
| from bs4 import BeautifulSoup | |
| import os | |
| import json | |
| from urllib.parse import urljoin | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| def _get_proxies() -> dict: | |
| """Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY.""" | |
| proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or "" | |
| if not proxy: | |
| return {} | |
| return {"http": proxy, "https": proxy} | |
| class ETSIDocFinder: | |
| HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} | |
| def __init__(self): | |
| self.main_ftp_url = "https://docbox.etsi.org/SET" | |
| req_data = self.connect() | |
| print(req_data['message']) | |
| self.session = req_data['session'] | |
| def connect(self): | |
| session = requests.Session() | |
| session.headers.update(self.HEADERS) | |
| session.proxies.update(_get_proxies()) | |
| # Seed DNN session cookies — docbox requires the portal session to be | |
| # initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie | |
| # is scoped to .etsi.org and accepted by docbox.etsi.org as well. | |
| login_redir_url = ( | |
| "https://portal.etsi.org/LoginRedirection.aspx" | |
| "?domain=docbox.etsi.org&ReturnUrl=/" | |
| ) | |
| session.get(login_redir_url, verify=False, timeout=15) | |
| req = session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=json.dumps({"username": os.environ.get("EOL_USER"), | |
| "password": os.environ.get("EOL_PASSWORD")}), | |
| headers={"Content-Type": "application/json; charset=UTF-8", | |
| "Referer": login_redir_url}, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=15, | |
| ) | |
| if req.text == "Failed": | |
| return {"error": True, "session": session, "message": "Login failed ! Check your credentials"} | |
| # Always update self.session so reconnect and reauth actually take effect | |
| self.session = session | |
| return {"error": False, "session": session, "message": "Login successful"} | |
| def download_document(self, url: str) -> bytes: | |
| """Download a docbox file using the authenticated session. | |
| If the session has expired the portal redirects to LoginRedirection — | |
| we detect this and re-authenticate before retrying. | |
| """ | |
| resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) | |
| # Detect auth redirect (portal login page returned instead of file) | |
| if resp.url and "LoginRedirection" in resp.url: | |
| self.connect() # connect() now updates self.session | |
| resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) | |
| return resp.content | |
| def get_workgroup(self, doc: str): | |
| main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None | |
| if main_tsg is None: | |
| return None, None, None | |
| regex = re.search(r'\(([^)]+)\)', doc) | |
| workgroup = "20" + regex.group(1) | |
| return main_tsg, workgroup, doc | |
| def find_workgroup_url(self, main_tsg, workgroup): | |
| url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS" | |
| response = self.session.get(url, verify=False, timeout=15) | |
| # If docbox redirected to the portal login page, reauth and retry once | |
| if "LoginRedirection" in response.url: | |
| self.connect() | |
| response = self.session.get(url, verify=False, timeout=15) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for item in soup.find_all("tr"): | |
| link = item.find("a") | |
| if link and workgroup in link.get_text(): | |
| return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" | |
| return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" | |
| def get_docs_from_url(self, url): | |
| try: | |
| response = self.session.get(url, verify=False, timeout=15) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| return [item.get_text() for item in soup.select("tr td a")] | |
| except Exception as e: | |
| print(f"Error accessing {url}: {e}") | |
| return [] | |
| def search_document(self, doc_id: str): | |
| original = doc_id | |
| main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
| urls = [] | |
| if main_tsg: | |
| wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
| print(wg_url) | |
| if wg_url: | |
| entries = self.get_docs_from_url(wg_url) | |
| print(entries) | |
| for entry in entries: | |
| if doc in entry.lower() or original in entry: | |
| doc_url = f"{wg_url}/{entry}" | |
| urls.append(doc_url) | |
| elif "." not in entry.rstrip("/"): | |
| # looks like a subdirectory — go one level deeper | |
| sub_url = f"{wg_url}/{entry}" | |
| files = self.get_docs_from_url(sub_url) | |
| for f in files: | |
| if doc in f.lower() or original in f: | |
| print(f) | |
| urls.append(f"{sub_url}/{f}") | |
| return urls[0] if len(urls) == 1 else urls[-1] if len(urls) > 1 else f"Document {doc_id} not found" | |
| class ETSISpecFinder: | |
| def __init__(self): | |
| self.main_url = "https://www.etsi.org/deliver/etsi_ts" | |
| self.second_url = "https://www.etsi.org/deliver/etsi_tr" | |
| self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} | |
| def get_spec_path(self, doc_id: str): | |
| if "-" in doc_id: | |
| position, part = doc_id.split("-") | |
| else: | |
| position, part = doc_id, None | |
| position = position.replace(" ", "") | |
| if part: | |
| if len(part) == 1: | |
| part = "0" + part | |
| spec_folder = position + part if part is not None else position | |
| return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}" | |
| def get_docs_from_url(self, url): | |
| try: | |
| response = requests.get(url, verify=False, timeout=15, proxies=_get_proxies()) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| docs = [item.get_text() for item in soup.find_all("a")][1:] | |
| return docs | |
| except Exception as e: | |
| print(f"Error accessing {url}: {e}") | |
| return [] | |
| def _normalise_version(self, version: str) -> str: | |
| """Normalise a user-supplied version string to ETSI zero-padded format. | |
| '17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching) | |
| Already-normalised strings like '17.06.00' are returned unchanged.""" | |
| parts = version.strip("/").split(".") | |
| if len(parts) == 3: | |
| try: | |
| return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}" | |
| except ValueError: | |
| pass | |
| return version.strip("/") | |
| def _pick_release(self, releases: list, version: str = None) -> str: | |
| """Return the release folder matching version, or the latest if not found/specified.""" | |
| if version: | |
| target = self._normalise_version(version) | |
| for r in releases: | |
| # folder names are like '17.06.00_60'; match on the part before '_' | |
| folder = r.strip("/").split("_")[0] | |
| if folder == target: | |
| return r | |
| return releases[-1] | |
| def search_document(self, doc_id: str, version: str = None): | |
| # Example : 103 666[-2 opt] | |
| original = doc_id | |
| url = f"{self.main_url}/{self.get_spec_path(original)}/" | |
| url2 = f"{self.second_url}/{self.get_spec_path(original)}/" | |
| print(url) | |
| print(url2) | |
| releases = self.get_docs_from_url(url) | |
| if releases: | |
| release = self._pick_release(releases, version) | |
| files = self.get_docs_from_url(url + release) | |
| for f in files: | |
| if f.endswith(".pdf"): | |
| return url + release + "/" + f | |
| releases = self.get_docs_from_url(url2) | |
| if releases: | |
| release = self._pick_release(releases, version) | |
| files = self.get_docs_from_url(url2 + release) | |
| for f in files: | |
| if f.endswith(".pdf"): | |
| return url2 + release + "/" + f | |
| return f"Specification {doc_id} not found" | |
| def _get_wki_id_candidates(self, doc_id: str, version: str = None) -> list: | |
| """Return a list of candidate wki_ids for a spec version (best match first).""" | |
| if version: | |
| version_str = version | |
| else: | |
| # Derive version from the FTP PDF URL | |
| pdf_url = self.search_document(doc_id) | |
| if "not found" in pdf_url.lower(): | |
| return [] | |
| parts = pdf_url.rstrip("/").split("/") | |
| version_folder = parts[-2] # e.g. "18.04.00_60" | |
| v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"] | |
| try: | |
| version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}" | |
| except (ValueError, IndexError): | |
| return [] | |
| def fetch_for_type(spec_type): | |
| params = { | |
| "option": "com_standardssearch", | |
| "view": "data", | |
| "format": "json", | |
| "page": "1", | |
| "search": f"ETSI {spec_type} {doc_id} v{version_str}", | |
| "etsiNumber": "1", | |
| "published": "1", | |
| } | |
| try: | |
| resp = requests.get("https://www.etsi.org/", params=params, | |
| headers=self.headers, verify=False, timeout=15, | |
| proxies=_get_proxies()) | |
| data = resp.json() | |
| if data and isinstance(data, list): | |
| return [str(item["wki_id"]) for item in data if "wki_id" in item] | |
| except Exception as e: | |
| print(f"Error getting wki_id for {doc_id}: {e}") | |
| return [] | |
| candidates = [] | |
| with ThreadPoolExecutor(max_workers=2) as executor: | |
| for result in executor.map(fetch_for_type, ["TS", "TR"]): | |
| candidates.extend(result) | |
| return candidates | |
| def _authenticate_eol(self, wki_id: str) -> requests.Session: | |
| """Create a requests.Session authenticated to the ETSI EOL portal.""" | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": self.headers["User-Agent"]}) | |
| session.proxies.update(_get_proxies()) | |
| login_redir_url = ( | |
| f"https://portal.etsi.org/LoginRedirection.aspx" | |
| f"?ReturnUrl=%2fwebapp%2fprotect%2fNTaccount.asp%3fWki_Id%3d{wki_id}" | |
| f"&Wki_Id={wki_id}" | |
| ) | |
| # Seed DNN session cookies | |
| session.get(login_redir_url, verify=False, timeout=15) | |
| # Authenticate via EOL JSON login | |
| session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=json.dumps({"username": os.environ.get("EOL_USER"), | |
| "password": os.environ.get("EOL_PASSWORD")}), | |
| headers={"Content-Type": "application/json; charset=UTF-8", | |
| "Referer": login_redir_url}, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=15, | |
| ) | |
| return session | |
| def search_document_docx(self, doc_id: str, version: str = None) -> str: | |
| """Download an ETSI spec as DOCX and return the local file path.""" | |
| candidates = self._get_wki_id_candidates(doc_id, version) | |
| if not candidates: | |
| return f"Specification {doc_id} not found" | |
| # Authenticate once — cookies are auth tokens, not wki_id-specific | |
| auth_session = self._authenticate_eol(candidates[0]) | |
| def try_wki(wki_id): | |
| print(f"Trying wki_id={wki_id} for {doc_id}") | |
| # Each thread gets its own session pre-loaded with the shared auth cookies | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": self.headers["User-Agent"]}) | |
| session.proxies.update(_get_proxies()) | |
| session.cookies.update(auth_session.cookies) | |
| r = session.get( | |
| f"https://portal.etsi.org/webapp/protect/NTaccount.asp?Wki_Id={wki_id}", | |
| verify=False, timeout=15, | |
| ) | |
| meta_match = re.search(r'URL=([^"\'\s>]+)', r.text) | |
| if not meta_match: | |
| print(f" wki_id={wki_id}: authentication failed, trying next") | |
| return None | |
| meta_url = meta_match.group(1) | |
| if not meta_url.startswith("http"): | |
| meta_url = f"https://portal.etsi.org/webapp/protect/{meta_url}" | |
| r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15) | |
| if r2.status_code != 302: | |
| print(f" wki_id={wki_id}: unexpected status {r2.status_code}, trying next") | |
| return None | |
| location2 = r2.headers.get("Location", "") | |
| if "processError" in location2 or "processErrors" in location2: | |
| print(f" wki_id={wki_id}: portal rejected ({location2}), trying next") | |
| return None | |
| copy_url = urljoin("https://portal.etsi.org/", location2) | |
| r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15) | |
| if r3.status_code == 302: | |
| location3 = r3.headers.get("Location", "") | |
| final_url = urljoin("https://portal.etsi.org/webapp/ewp/", location3) | |
| r4 = session.get(final_url, verify=False, timeout=15) | |
| else: | |
| r4 = r3 | |
| docx_urls = re.findall(r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE) | |
| if not docx_urls: | |
| print(f" wki_id={wki_id}: DOCX not found in page, trying next") | |
| return None | |
| spec_num = doc_id.split("-")[0].replace(" ", "") | |
| matching_urls = [u for u in docx_urls if spec_num in u.split("/")[-1]] | |
| if not matching_urls: | |
| print(f" wki_id={wki_id}: DOCX spec mismatch (expected {spec_num}), trying next") | |
| return None | |
| docx_url = matching_urls[0] | |
| dl = session.get(docx_url, headers={"Referer": r4.url}, verify=False, timeout=60) | |
| filename = docx_url.split("/")[-1] | |
| tmp_path = f"/tmp/{filename}" | |
| with open(tmp_path, "wb") as f: | |
| f.write(dl.content) | |
| print(f" wki_id={wki_id}: success") | |
| return tmp_path | |
| with ThreadPoolExecutor(max_workers=min(len(candidates), 4)) as executor: | |
| future_to_wki = {executor.submit(try_wki, wki_id): wki_id for wki_id in candidates} | |
| for future in as_completed(future_to_wki): | |
| result = future.result() | |
| if result is not None: | |
| return result | |
| return f"Specification {doc_id}: all {len(candidates)} wki_id candidate(s) rejected by ETSI portal" |