Spaces:
Running
on
Zero
Running
on
Zero
File size: 1,589 Bytes
0bdbec3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
from __future__ import annotations
from typing import Dict, List
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from ..llm.gemini import call_gemini
from ..state import AppState, ScoreItem
def run(state: AppState) -> AppState:
images = list(state.get("images", []))
if not images:
state["scores"] = []
return state
max_workers = max(1, min(len(images), int(os.getenv("NNG_CONCURRENCY", "4"))))
results: List[ScoreItem | None] = [None] * len(images)
def _judge_one(i: int) -> tuple[int, Dict]:
im = images[i]
res: Dict = call_gemini("judge", image_path=im.path, spec=state.get("spec", {}))
return i, res
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = [ex.submit(_judge_one, i) for i in range(len(images))]
for fut in as_completed(futures):
try:
i, res = fut.result()
im = images[i]
results[i] = {
"image_path": im.path,
"score": float(res.get("score", 0.0)),
"violations": list(res.get("violations", [])),
}
except Exception as e:
im = images[futures.index(fut)] if fut in futures else None
path = im.path if im else ""
results[i] = {
"image_path": path,
"score": 0.0,
"violations": [f"judge error: {e}"]
}
state["scores"] = [s for s in results if s is not None]
return state
|