| """
|
| inference.py
|
| ------------
|
| Official submission inference script for the Data Cleaning Pipeline environment.
|
|
|
| Environment variables:
|
| API_BASE_URL LLM endpoint. Default: HuggingFace free router.
|
| MODEL_NAME Model to use. Default: Qwen/Qwen2.5-72B-Instruct (free).
|
| HF_TOKEN Your HuggingFace token (hf_...).
|
| ENV_BASE_URL The running environment URL.
|
| Set this to your HuggingFace Space URL, e.g.:
|
| https://CodeKnightDebjit-data-cleaning-env.hf.space
|
|
|
| NOTE: Do NOT use LOCAL_IMAGE_NAME / from_docker_image() in submitted scripts.
|
| The evaluator machine does not have your local Docker image β it connects to
|
| your live HF Space via ENV_BASE_URL.
|
|
|
| STDOUT FORMAT (evaluator parses these exactly):
|
| [START] task=<n> env=<benchmark> model=<model>
|
| [STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<msg|null>
|
| [END] success=<true|false> steps=<n> score=<0.00> rewards=<r1,r2,...>
|
| """
|
|
|
| import asyncio
|
| import json
|
| import os
|
| import re
|
| import sys
|
| from typing import Any, Dict, List, Optional
|
|
|
| from openai import OpenAI
|
|
|
| try:
|
| from client import DataCleaningEnv
|
| from models import CleanAction, MAX_STEPS, DONE_THRESHOLD
|
| except ImportError:
|
| sys.path.insert(0, os.path.dirname(__file__))
|
| from client import DataCleaningEnv
|
| from models import CleanAction, MAX_STEPS, DONE_THRESHOLD
|
|
|
|
|
|
|
|
|
|
|
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
|
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
|
| HF_TOKEN = os.getenv("HF_TOKEN", "")
|
| ENV_BASE_URL = os.getenv("ENV_BASE_URL", "https://CodeKnightDebjit-data-cleaning-env.hf.space")
|
|
|
| BENCHMARK = "data_cleaning_env"
|
| TASK_IDS = ["easy", "medium", "hard"]
|
| STEP_LIMITS = {"easy": 25, "medium": 50, "hard": 80}
|
|
|
|
|
|
|
|
|
| SYSTEM_PROMPT = """You are a deterministic data cleaning agent.
|
| Your task is to clean a dataset step-by-step using valid actions.
|
| You are operating inside an environment with strict rules.
|
| --------------------------------------------------
|
| ## INPUT PROVIDED EACH STEP
|
| You will receive:
|
| 1. Column schema (LIST OF VALID COLUMN NAMES β CASE SENSITIVE)
|
| 2. Column status:
|
| - missing values count
|
| - whether standardized (true/false)
|
| 3. Remaining issues (global state)
|
| 4. Previous actions taken
|
| --------------------------------------------------
|
| ## OBJECTIVE
|
| Fully clean the dataset with MINIMUM steps.
|
| A dataset is CLEAN only if:
|
| - No missing values remain
|
| - All columns are standardized
|
| - No invalid formats exist
|
| --------------------------------------------------
|
| ## STRICT RULES (MUST FOLLOW)
|
| ### 1. NEVER TERMINATE EARLY
|
| You MUST NOT output DONE unless:
|
| - ALL columns have missing = 0
|
| - ALL columns have standardized = true
|
| - remaining_issues is empty
|
| If ANY issue remains β DO NOT output DONE.
|
| --------------------------------------------------
|
| ### 2. USE ONLY VALID COLUMNS
|
| - You MUST use EXACT column names from schema
|
| - Column names are CASE SENSITIVE
|
| - NEVER invent new column names
|
| --------------------------------------------------
|
| ### 3. PRIORITIZE COLUMN-LEVEL ACTIONS
|
| Preferred actions:
|
| - FILL_MISSING (fixes entire column)
|
| - STANDARDIZE_COL (fixes formatting)
|
| Avoid:
|
| - SET_VALUE (only for single isolated errors)
|
| NEVER fix a full column using repeated SET_VALUE.
|
| --------------------------------------------------
|
| ### 4. DO NOT REPEAT ACTIONS
|
| - Do NOT apply the same action repeatedly on the same column
|
| - Do NOT standardize an already standardized column
|
| - Do NOT fill missing if missing = 0
|
| --------------------------------------------------
|
| ### 5. AVOID DESTRUCTIVE ACTIONS
|
| - DROP_ROW should be used ONLY when absolutely necessary
|
| --------------------------------------------------
|
| ## OUTPUT FORMAT (STRICT JSON ONLY)
|
| Return ONLY one of these β no explanation, no markdown:
|
| {"action": "FILL_MISSING", "column": "<col>", "strategy": "<mean|median|mode>"}
|
| {"action": "STANDARDIZE_COL", "column": "<col>"}
|
| {"action": "SET_VALUE", "column": "<col>", "row": <int>, "value": "<str>"}
|
| {"action": "DROP_ROW", "row": <int>}
|
| {"action": "DONE"}
|
| --------------------------------------------------
|
| ## FAILURE CONDITIONS (AVOID THESE)
|
| - DONE prematurely β penalty -1.0
|
| - Invalid column names β action fails
|
| - Repeated same action β wasted step
|
| --------------------------------------------------
|
| Every step must move the dataset closer to a fully clean state."""
|
|
|
|
|
|
|
|
|
| def log_start(task: str, env: str, model: str) -> None:
|
| print(f"[START] task={task} env={env} model={model}", flush=True)
|
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool,
|
| error: Optional[str]) -> None:
|
| error_val = error if error else "null"
|
| print(
|
| f"[STEP] step={step} action={action[:80].replace(chr(10), ' ')} "
|
| f"reward={reward:.2f} done={str(done).lower()} error={error_val}",
|
| flush=True,
|
| )
|
|
|
|
|
| def log_end(success: bool, steps: int, score: float,
|
| rewards: List[float]) -> None:
|
| rewards_str = ",".join(f"{r:.2f}" for r in rewards)
|
| print(
|
| f"[END] success={str(success).lower()} steps={steps} "
|
| f"score={score:.2f} rewards={rewards_str}",
|
| flush=True,
|
| )
|
|
|
|
|
|
|
|
|
| def _col_status_block(column_status: Dict[str, Any]) -> str:
|
| if not column_status:
|
| return " (not available)"
|
| lines = []
|
| for col, s in column_status.items():
|
| missing = s.get("missing", 0)
|
| standardized = s.get("standardized", True)
|
| issues = s.get("issues", [])
|
| flag = "OK" if (missing == 0 and standardized) else "NEEDS_FIX"
|
| issue_str = ", ".join(issues) if issues else ""
|
| lines.append(
|
| f" {col:<26} missing={missing:<3} standardized={str(standardized).lower():<5}"
|
| + (f" issues=[{issue_str}]" if issue_str else "")
|
| + f" β {flag}"
|
| )
|
| return "\n".join(lines)
|
|
|
|
|
| def build_user_prompt(obs, history: List[str]) -> str:
|
| col_status: Dict[str, Any] = getattr(obs, "column_status", {})
|
| valid_columns = list(col_status.keys())
|
| broken = [c for c, s in col_status.items()
|
| if s.get("missing", 0) > 0 or not s.get("standardized", True)]
|
|
|
| rows = obs.dirty_csv.strip().split("\n")
|
| preview = "\n".join(rows[:21])
|
|
|
| all_clean = len(broken) == 0
|
| done_hint = (
|
| "ALL columns clean β you MAY output DONE"
|
| if all_clean else
|
| f"{len(broken)} column(s) still broken β DO NOT output DONE"
|
| )
|
|
|
| history_block = "\n".join(f" {h}" for h in history[-6:]) if history else " none"
|
|
|
| return f"""--------------------------------------------------
|
| ## COLUMN SCHEMA (EXACT CASE-SENSITIVE NAMES β USE THESE EXACTLY)
|
| {chr(10).join(f' - {c}' for c in valid_columns)}
|
|
|
| --------------------------------------------------
|
| ## COLUMN STATUS
|
| {_col_status_block(col_status)}
|
|
|
| --------------------------------------------------
|
| ## GLOBAL STATE
|
| Task: {obs.task_id}
|
| Step: {obs.step_number} / {obs.max_steps}
|
| Score: {obs.current_score:.4f} (need >= {DONE_THRESHOLD[obs.task_id]:.2f})
|
| Remaining issues: {obs.issues_remaining}
|
| Broken columns: {broken}
|
| DONE status: {done_hint}
|
|
|
| --------------------------------------------------
|
| ## SCHEMA HINT
|
| {obs.schema_hint}
|
|
|
| --------------------------------------------------
|
| ## CSV PREVIEW (first 20 rows)
|
| {preview}
|
|
|
| --------------------------------------------------
|
| ## PREVIOUS ACTIONS
|
| {history_block}
|
|
|
| --------------------------------------------------
|
| Return ONLY valid JSON β no explanation, no markdown."""
|
|
|
|
|
|
|
|
|
| COMMAND_MAP = {
|
| "FILL_MISSING": "FILL_MISSING",
|
| "STANDARDIZE_COL": "STANDARDIZE_COL",
|
| "STANDARDIZE": "STANDARDIZE_COL",
|
| "SET_VALUE": "SET_VALUE",
|
| "DROP_ROW": "DROP_ROW",
|
| "DROP": "DROP_ROW",
|
| }
|
|
|
| VALID_STRATEGIES = {"mean", "median", "mode", "drop"}
|
|
|
|
|
| def parse_action(raw: str, valid_columns: List[str]) -> CleanAction:
|
| text = raw.strip()
|
| if text.startswith("```"):
|
| lines = text.split("\n")
|
| inner = lines[1:-1] if lines[-1].strip().startswith("```") else lines[1:]
|
| text = "\n".join(inner).strip()
|
|
|
| m = re.search(r"\{[^{}]*\}", text, re.DOTALL)
|
| if not m:
|
| return CleanAction(command="DONE")
|
|
|
| try:
|
| data: Dict[str, Any] = json.loads(m.group())
|
| except json.JSONDecodeError:
|
| return CleanAction(command="DONE")
|
|
|
| action_raw = str(data.get("action", "DONE")).strip().upper().replace(" ", "_")
|
|
|
| if action_raw == "DONE":
|
| return CleanAction(command="DONE")
|
|
|
| command = COMMAND_MAP.get(action_raw)
|
| if command is None:
|
| return CleanAction(command="DONE")
|
|
|
|
|
| column = data.get("column")
|
| if column is not None and valid_columns:
|
| if column not in valid_columns:
|
| col_lower = {c.lower(): c for c in valid_columns}
|
| column = col_lower.get(str(column).lower())
|
|
|
|
|
| fill_strategy = data.get("strategy") or data.get("fill_strategy")
|
| if fill_strategy and str(fill_strategy).lower() not in VALID_STRATEGIES:
|
| fill_strategy = "median"
|
|
|
|
|
| row_raw = data.get("row") if data.get("row") is not None else data.get("row_index")
|
| row_index = None
|
| if row_raw is not None:
|
| try:
|
| row_index = int(row_raw)
|
| except (TypeError, ValueError):
|
| pass
|
|
|
| value = data.get("value")
|
|
|
| try:
|
| return CleanAction(
|
| command = command,
|
| column = column,
|
| fill_strategy = fill_strategy,
|
| row_index = row_index,
|
| value = str(value) if value is not None else None,
|
| )
|
| except Exception:
|
| return CleanAction(command="DONE")
|
|
|
|
|
| def call_llm(client: OpenAI, messages: list) -> str:
|
| response = client.chat.completions.create(
|
| model = MODEL_NAME,
|
| messages = messages,
|
| max_tokens = 100,
|
| temperature = 0.0,
|
| )
|
| return (response.choices[0].message.content or "").strip()
|
|
|
|
|
|
|
|
|
| async def run_episode(env, client: OpenAI, task_id: str) -> dict:
|
| max_steps = STEP_LIMITS[task_id]
|
| threshold = DONE_THRESHOLD[task_id]
|
| rewards: List[float] = []
|
| steps_taken = 0
|
| score = 0.0
|
| success = False
|
| history: List[str] = []
|
|
|
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
|
|
|
| try:
|
| result = await env.reset(task_id=task_id)
|
| obs = result.observation
|
|
|
| valid_columns: List[str] = list(getattr(obs, "column_status", {}).keys())
|
| messages = [{"role": "system", "content": SYSTEM_PROMPT}]
|
|
|
| for step in range(1, max_steps + 1):
|
| if obs.done:
|
| break
|
|
|
| steps_taken = step
|
| messages.append({"role": "user", "content": build_user_prompt(obs, history)})
|
|
|
| try:
|
| raw = call_llm(client, messages)
|
| action = parse_action(raw, valid_columns)
|
| messages.append({"role": "assistant", "content": raw})
|
| except Exception as exc:
|
| log_step(step, "DONE", 0.00, True, str(exc)[:120])
|
| rewards.append(0.0)
|
| break
|
|
|
|
|
| if len(messages) > 21:
|
| messages = [messages[0]] + messages[-20:]
|
|
|
| result = await env.step(action)
|
| obs = result.observation
|
|
|
| if getattr(obs, "column_status", {}):
|
| valid_columns = list(obs.column_status.keys())
|
|
|
| reward = result.reward or 0.0
|
| rewards.append(reward)
|
| score = obs.current_score
|
|
|
| log_step(
|
| step = step,
|
| action = action.command,
|
| reward = reward,
|
| done = obs.done,
|
| error = obs.last_action_error,
|
| )
|
|
|
| parts = [f"step {step}: {action.command}"]
|
| if action.column: parts.append(f"col={action.column}")
|
| if action.fill_strategy: parts.append(f"strategy={action.fill_strategy}")
|
| parts.append(f"score={score:.4f}")
|
| if obs.last_action_error:
|
| parts.append(f"[BLOCKED: {obs.last_action_error[:60]}]")
|
| history.append(" ".join(parts))
|
|
|
| if obs.done or score >= threshold:
|
| break
|
|
|
| success = score >= threshold
|
|
|
| except Exception as episode_err:
|
|
|
| print(f"[DEBUG] Episode error: {episode_err}", flush=True)
|
| log_end(success=False, steps=steps_taken, score=score, rewards=rewards)
|
| return {"task_id": task_id, "score": score, "reward": sum(rewards),
|
| "steps": steps_taken, "success": False}
|
|
|
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
|
| return {"task_id": task_id, "score": score, "reward": sum(rewards),
|
| "steps": steps_taken, "success": success}
|
|
|
|
|
|
|
|
|
| async def main() -> None:
|
| if not HF_TOKEN:
|
| print(
|
| "ERROR: HF_TOKEN is not set.\n"
|
| "1. Go to https://huggingface.co/settings/tokens\n"
|
| "2. Create a Read token and copy it\n"
|
| "3. Set it: $env:HF_TOKEN='hf_xxxxxxxxxxxx' (PowerShell)\n"
|
| " export HF_TOKEN='hf_xxxxxxxxxxxx' (bash)\n"
|
| "4. Run: python inference.py",
|
| file=sys.stderr,
|
| )
|
| sys.exit(1)
|
|
|
| print(f"API_BASE_URL : {API_BASE_URL}", flush=True)
|
| print(f"MODEL_NAME : {MODEL_NAME}", flush=True)
|
| print(f"ENV_BASE_URL : {ENV_BASE_URL}", flush=True)
|
| print("", flush=True)
|
|
|
| llm = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
|
|
|
|
|
| env = DataCleaningEnv(base_url=ENV_BASE_URL)
|
| await env.connect()
|
|
|
| results = []
|
| try:
|
| for task_id in TASK_IDS:
|
| summary = await run_episode(env, llm, task_id)
|
| results.append(summary)
|
| print("", flush=True)
|
| finally:
|
| try:
|
| await env.close()
|
| except Exception:
|
| pass
|
|
|
| print("=" * 56, flush=True)
|
| print(f"{'Task':<10} {'Score':>7} {'Reward':>9} {'Steps':>6} {'Pass':>5}")
|
| print("-" * 56, flush=True)
|
| for r in results:
|
| print(
|
| f"{r['task_id']:<10} {r['score']:>7.4f} {r['reward']:>9.4f} "
|
| f"{r['steps']:>6} {'YES' if r['success'] else 'NO':>4}",
|
| flush=True,
|
| )
|
| print("=" * 56, flush=True)
|
|
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main()) |