ResearchHarness / agent_base /console_utils.py
black-yt's picture
Sync Space agent core
e2a452d
import argparse
import json
import os
from pathlib import Path
import shutil
import sys
import unicodedata
from typing import Any, Optional
ANSI_RESET = "\033[0m"
ANSI_COLORS = {
"header": "\033[36m",
"assistant": "\033[32m",
"tool": "\033[33m",
"runtime": "\033[34m",
"user": "\033[35m",
"error": "\033[31m",
}
def _char_display_width(char: str) -> int:
if unicodedata.combining(char):
return 0
if unicodedata.category(char) in {"Cc", "Cf"}:
return 0
return 2 if unicodedata.east_asian_width(char) in {"F", "W"} else 1
def _display_width(text: str) -> int:
return sum(_char_display_width(char) for char in str(text))
def _truncate_display(text: str, width: int) -> str:
if _display_width(text) <= width:
return text
suffix = "..."
target = max(0, width - _display_width(suffix))
out = []
used = 0
for char in text:
char_width = _char_display_width(char)
if used + char_width > target:
break
out.append(char)
used += char_width
return "".join(out) + suffix
def _pad_display(text: str, width: int) -> str:
return text + " " * max(0, width - _display_width(text))
def _last_soft_break(chars: list[str]) -> int:
for index in range(len(chars) - 1, 0, -1):
if chars[index].isspace() and "".join(chars[:index]).strip():
return index
return -1
class ConsoleEventPrinter:
def __init__(self, *, model_name: str, workspace_root: Path, prompt: str):
self.model_name = model_name
self.workspace_root = workspace_root
self.prompt = prompt.strip()
self._printed_any = False
self._use_color = (
"NO_COLOR" not in os.environ
and os.environ.get("TERM") != "dumb"
and (sys.stdout.isatty() or bool(os.environ.get("FORCE_COLOR") or os.environ.get("CLICOLOR_FORCE")))
)
def print_header(self) -> None:
self._print_box(
"ResearchHarness CLI",
f"Model: {self.model_name}\nWorkspace Root: {self.workspace_root}\n\nPrompt:\n{self.prompt}",
"header",
)
def reset_rounds(self) -> None:
self._printed_any = False
def _paint(self, text: str, color_key: str) -> str:
if not self._use_color:
return text
return f"{ANSI_COLORS.get(color_key, '')}{text}{ANSI_RESET}"
def _terminal_width(self) -> int:
return max(60, min(110, shutil.get_terminal_size((100, 20)).columns))
def _wrap_line(self, line: str, width: int) -> list[str]:
expanded = line.expandtabs(2)
if expanded == "":
return [""]
chunks: list[str] = []
current: list[str] = []
current_width = 0
for char in expanded:
char_width = _char_display_width(char)
if current and current_width + char_width > width:
break_at = _last_soft_break(current)
if break_at > 0:
chunks.append("".join(current[:break_at]).rstrip())
current = list("".join(current[break_at + 1 :]).lstrip())
current_width = _display_width("".join(current))
else:
chunks.append("".join(current))
current = []
current_width = 0
current.append(char)
current_width += char_width
if current:
chunks.append("".join(current))
return chunks or [""]
def _print_box(self, title: str, body: str, color_key: str = "runtime") -> None:
width = self._terminal_width()
inner_width = width - 4
title_text = f" {_truncate_display(title.strip(), width - 6)} "
top = "+" + title_text + "-" * max(0, width - 2 - _display_width(title_text)) + "+"
bottom = "+" + "-" * (width - 2) + "+"
if self._printed_any:
print()
print(self._paint(top, color_key))
for raw_line in str(body or "").splitlines() or [""]:
for line in self._wrap_line(raw_line, inner_width):
padded = _pad_display(line, inner_width)
print(f"{self._paint('|', color_key)} {padded} {self._paint('|', color_key)}")
print(self._paint(bottom, color_key))
self._printed_any = True
def _title(self, label: str, turn_index: int) -> str:
return f"{label} | round {turn_index}" if turn_index > 0 else label
def _format_tool_call(self, tool_name: str, tool_args: Any) -> str:
try:
tool_args_text = json.dumps(tool_args, ensure_ascii=False, indent=2)
except TypeError:
tool_args_text = str(tool_args)
return f"- {tool_name}\n{tool_args_text}"
def handle_event(self, row: dict[str, Any]) -> None:
role = str(row.get("role", ""))
turn_index = int(row.get("turn_index", 0) or 0)
text = str(row.get("text", ""))
capture_type = str(row.get("capture_type", ""))
tool_names = row.get("tool_names") if isinstance(row.get("tool_names"), list) else []
tool_arguments = row.get("tool_arguments") if isinstance(row.get("tool_arguments"), list) else []
finish_reason = str(row.get("finish_reason", ""))
error = str(row.get("error", ""))
if capture_type and not text.strip():
return
if role == "system":
return
if role == "user":
if turn_index == 0:
return
self._print_box(self._title("Runtime Message", turn_index), text, "user")
return
if role == "assistant":
lines: list[str] = []
if tool_names:
if text.strip():
lines.append(text)
else:
suffix = f" finish_reason={finish_reason}" if finish_reason else ""
lines.append(f"(no text; native tool-calls only.{suffix})")
lines.append("")
lines.append("Assistant Tool Calls:")
for idx, tool_name in enumerate(tool_names):
tool_args = tool_arguments[idx] if idx < len(tool_arguments) else {}
lines.append(self._format_tool_call(str(tool_name), tool_args))
elif text.strip():
lines.append(text)
else:
suffix = f" finish_reason={finish_reason}" if finish_reason else ""
lines.append(f"(empty assistant output.{suffix})")
if error:
lines.append("")
lines.append(f"Assistant Error: {error}")
self._print_box(self._title("Assistant", turn_index), "\n".join(lines), "error" if error else "assistant")
return
if role == "tool":
tool_name = str(tool_names[0]) if tool_names else "Tool"
lines = [text]
if error:
lines.extend(["", f"{tool_name} Error: {error}"])
self._print_box(self._title(f"{tool_name} Result", turn_index), "\n".join(lines), "error" if error else "tool")
return
if role == "runtime":
lines = [text]
if error:
lines.extend(["", f"Runtime Error: {error}"])
self._print_box(self._title("Runtime", turn_index), "\n".join(lines), "error" if error else "runtime")
def main(argv: Optional[list[str]] = None) -> int:
parser = argparse.ArgumentParser(description="Show a minimal example of the CLI console event formatter.")
parser.parse_args(argv)
printer = ConsoleEventPrinter(model_name="demo-model", workspace_root=Path("."), prompt="demo question")
printer.print_header()
printer.handle_event(
{
"role": "assistant",
"turn_index": 1,
"text": "",
"tool_names": ["Read"],
"tool_arguments": [{"path": "demo.txt"}],
"termination": "",
"error": "",
}
)
return 0
if __name__ == "__main__":
raise SystemExit(main())