Files
Pulse/scripts/eval/render_model_matrix.py

250 lines
7.2 KiB
Python

#!/usr/bin/env python3
import argparse
import datetime as dt
import json
from pathlib import Path
SCENARIO_COLUMNS = {
"Quick Smoke Test": "Smoke",
"Read-Only Infrastructure": "Read-only",
}
EXCLUDED_MODEL_IDS = {
"openai:gpt-5.2-pro",
}
EXCLUDED_MODEL_KEYWORDS = (
"codex",
"embedding",
"image",
"vision",
"video",
"audio",
"speech",
"tts",
"transcribe",
"rerank",
"moderation",
"realtime",
)
TRANSIENT_ERROR_MARKERS = (
"rate limit",
"resource has been exhausted",
"quota",
"429",
"not a chat model",
"v1/chat/completions endpoint",
)
def parse_time(value):
if not value:
return None
if isinstance(value, (int, float)):
return dt.datetime.utcfromtimestamp(value)
if isinstance(value, str):
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
return dt.datetime.fromisoformat(text)
except ValueError:
return None
return None
def normalize_scenario(name):
if not name:
return None
for key in SCENARIO_COLUMNS:
if name == key:
return key
for key in SCENARIO_COLUMNS:
if key.lower() in name.lower():
return key
return None
def load_reports(report_dir):
report_dir = Path(report_dir)
if not report_dir.exists():
return []
return sorted(report_dir.glob("*.json"))
def build_matrix(report_paths):
records = {}
for path in report_paths:
try:
payload = json.loads(path.read_text())
except Exception:
continue
model = (payload.get("model") or "").strip()
if not model:
continue
if should_exclude_model(model):
continue
generated_at = parse_time(payload.get("generated_at"))
result = payload.get("result") or {}
scenario = normalize_scenario(result.get("ScenarioName"))
if not scenario:
continue
if has_transient_error(result):
continue
passed = bool(result.get("Passed"))
duration_ns = int(result.get("Duration") or 0)
tokens = 0
for step in result.get("Steps") or []:
tokens += int(step.get("InputTokens") or 0)
tokens += int(step.get("OutputTokens") or 0)
model_entry = records.setdefault(model, {"scenarios": {}, "last_run": None})
existing = model_entry["scenarios"].get(scenario)
if existing is None or (generated_at and existing["generated_at"] and generated_at > existing["generated_at"]) or (generated_at and existing["generated_at"] is None):
model_entry["scenarios"][scenario] = {
"passed": passed,
"generated_at": generated_at,
"duration_ns": duration_ns,
"tokens": tokens,
}
if generated_at:
last_run = model_entry["last_run"]
if last_run is None or generated_at > last_run:
model_entry["last_run"] = generated_at
return records
def should_exclude_model(model_id):
if model_id in EXCLUDED_MODEL_IDS:
return True
lowered = model_id.lower()
for keyword in EXCLUDED_MODEL_KEYWORDS:
if keyword and keyword in lowered:
return True
return False
def has_transient_error(result):
steps = result.get("Steps") or []
for step in steps:
error_text = str(step.get("Error") or "").lower()
if error_text and contains_any(error_text, TRANSIENT_ERROR_MARKERS):
return True
for event in step.get("RawEvents") or []:
if event.get("Type") != "error":
continue
data = event.get("Data")
if isinstance(data, (dict, list)):
text = json.dumps(data)
else:
text = str(data or "")
if contains_any(text.lower(), TRANSIENT_ERROR_MARKERS):
return True
return False
def format_status(passed):
if passed is True:
return ""
if passed is False:
return ""
return ""
def render_table(records):
header = ["Model", "Smoke", "Read-only", "Time (matrix)", "Tokens (matrix)", "Last run (UTC)"]
rows = [header, ["---"] * len(header)]
def sort_key(model_id):
if ":" in model_id:
provider, name = model_id.split(":", 1)
else:
provider, name = "", model_id
return (provider, name)
for model_id in sorted(records.keys(), key=sort_key):
entry = records[model_id]
scenarios = entry["scenarios"]
last_run = entry["last_run"]
last_run_text = last_run.strftime("%Y-%m-%d") if last_run else ""
smoke = scenarios.get("Quick Smoke Test")
readonly = scenarios.get("Read-Only Infrastructure")
total_duration = 0
total_tokens = 0
for scenario in (smoke, readonly):
if scenario:
total_duration += int(scenario.get("duration_ns") or 0)
total_tokens += int(scenario.get("tokens") or 0)
duration_text = format_duration(total_duration) if total_duration else ""
tokens_text = f"{total_tokens:,}" if total_tokens else ""
rows.append([
model_id,
format_status(smoke["passed"] if smoke else None),
format_status(readonly["passed"] if readonly else None),
duration_text,
tokens_text,
last_run_text,
])
if len(rows) == 2:
rows.append(["_No results yet_", "", "", "", "", ""])
return "\n".join("| " + " | ".join(row) + " |" for row in rows)
def format_duration(ns):
if not ns:
return ""
seconds = int(ns / 1_000_000_000)
if seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
rem = seconds % 60
if minutes < 60:
return f"{minutes}m {rem}s"
hours = minutes // 60
rem_m = minutes % 60
return f"{hours}h {rem_m}m"
def contains_any(text, markers):
for marker in markers:
if marker and marker in text:
return True
return False
def update_doc(doc_path, table):
doc_path = Path(doc_path)
content = doc_path.read_text()
start = "<!-- MODEL_MATRIX_START -->"
end = "<!-- MODEL_MATRIX_END -->"
if start not in content or end not in content:
raise RuntimeError("MODEL_MATRIX markers not found in doc")
prefix, rest = content.split(start, 1)
_, suffix = rest.split(end, 1)
next_content = prefix + start + "\n" + table + "\n" + end + suffix
doc_path.write_text(next_content)
def main():
parser = argparse.ArgumentParser(description="Render the Pulse Assistant model matrix table.")
parser.add_argument("report_dir", nargs="?", default="tmp/eval-reports", help="Directory with eval report JSON files.")
parser.add_argument("--write-doc", default="", help="Path to doc file to update in-place.")
args = parser.parse_args()
reports = load_reports(args.report_dir)
records = build_matrix(reports)
table = render_table(records)
if args.write_doc:
update_doc(args.write_doc, table)
else:
print(table)
if __name__ == "__main__":
main()