from __future__ import annotations import json import os from pathlib import Path from typing import Any from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import END, START, StateGraph from langgraph.runtime import Runtime from langgraph.store.memory import InMemoryStore from langgraph.types import Command from typing_extensions import TypedDict from verifiedx import BoundaryDeniedError, init_verifiedx, install_langgraph ROOT = Path(__file__).resolve().parent ARTIFACTS = ROOT / "artifacts" ARTIFACTS.mkdir(parents=True, exist_ok=True) class State(TypedDict, total=False): request: str memory_saved: bool memory_key: str update_workflow_after_memory: bool workflow_id: str workflow_reason: str workflow_status: str class Context(TypedDict): operator: str STATE: dict[str, Any] = { "memory_writes": [], "workflow_updates": [], } def route(state: State): text = str(state.get("request") or "").lower() if "remember" in text or "preference" in text: return Command(goto="save_memory") if "workflow" in text or "awaiting_human" in text: return Command(goto="update_workflow") return Command(goto="noop") def save_memory(state: State, runtime: Runtime[Context]) -> dict[str, Any]: operator = runtime.context["operator"] namespace = ("crm", operator) key = "cust_1001.preference" runtime.store.search(namespace, query="cust_1001") runtime.store.put( namespace, key, { "customer_id": "cust_1001", "preference": "sms", "note": state.get("request"), }, ) STATE["memory_writes"].append( { "namespace": list(namespace), "key": key, "operator": operator, } ) if "workflow" in str(state.get("request") or "").lower(): return Command( update={ "memory_saved": True, "memory_key": key, "update_workflow_after_memory": True, }, goto="update_workflow", ) return {"memory_saved": True, "memory_key": key} def update_workflow(state: State, runtime: Runtime[Context]): runtime.store.get(("workflow", runtime.context["operator"]), "wf-1002") STATE["workflow_updates"].append( { "workflow_id": "wf-1002", "status": "awaiting_human", "reason": "billing verification missing", } ) return Command( update={ "workflow_id": "wf-1002", "workflow_status": "awaiting_human", "workflow_reason": "billing verification missing", }, goto=END, ) def noop(state: State) -> dict[str, Any]: return {"request": state.get("request")} def build_graph(): builder = StateGraph(State, context_schema=Context) builder.add_node("route", route) builder.add_node("save_memory", save_memory) builder.add_node("update_workflow", update_workflow) builder.add_node("noop", noop) builder.add_edge(START, "route") builder.add_edge("save_memory", END) builder.add_edge("noop", END) return builder.compile(checkpointer=InMemorySaver(), store=InMemoryStore()) def scenario_prompt(name: str) -> str: prompts = { "safe_memory_write": ( "CASE-STARTER-2001: Save the durable preference that customer cust_1001 prefers SMS notifications." ), "blocked_graph_update": ( "CASE-STARTER-2002: Save the durable preference that customer cust_1001 prefers SMS notifications, " "then update workflow WF-1002 to awaiting_human because billing verification is missing." ), } return prompts[name] def latest_debug_records() -> list[dict[str, Any]]: path = ARTIFACTS / "verifiedx_diagnostics.jsonl" if not path.exists(): return [] records = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: continue return records def summarize_debug_records(records: list[dict[str, Any]]) -> dict[str, Any]: boundary_diagnostics = [ record for record in records if record.get("kind") == "verifiedx_boundary_diagnostic" ] return { "diagnostic_record_count": len(records), "boundary_outcomes": [ str((record.get("stored_decision") or record.get("decision") or {}).get("outcome") or "").strip() for record in boundary_diagnostics if str((record.get("stored_decision") or record.get("decision") or {}).get("outcome") or "").strip() ], "boundary_raw_names": [ str((((record.get("request_payload") or {}).get("decision_context") or {}).get("pending_action") or {}).get("raw_name") or "").strip() for record in boundary_diagnostics if str((((record.get("request_payload") or {}).get("decision_context") or {}).get("pending_action") or {}).get("raw_name") or "").strip() ], } def run_scenario(graph, *, name: str) -> dict[str, Any]: config = {"configurable": {"thread_id": f"starter-langgraph-{name}"}} state = {"request": scenario_prompt(name)} before_count = len(latest_debug_records()) try: result = graph.invoke(state, config=config, context={"operator": "ops_001"}) new_records = latest_debug_records()[before_count:] return { "scenario": name, "status": "ok", "result": result, "diagnostics": summarize_debug_records(new_records), } except BoundaryDeniedError as exc: new_records = latest_debug_records()[before_count:] return { "scenario": name, "status": "blocked", "error": str(exc), "loopback": exc.loopback, "tool_result": exc.tool_result(), "diagnostics": summarize_debug_records(new_records), } def main() -> None: if not os.environ.get("VERIFIEDX_API_KEY"): raise RuntimeError("VERIFIEDX_API_KEY is required") os.environ.setdefault("VERIFIEDX_BASE_URL", "https://api.verifiedx.me") os.environ.setdefault("VERIFIEDX_DEBUG_DIR", str(ARTIFACTS)) os.environ.setdefault("VERIFIEDX_DEBUG_DECISIONS", "1") os.environ.setdefault("VERIFIEDX_DEBUG_FETCH_DECISIONS", "1") os.environ.setdefault("VERIFIEDX_AGENT_ID", "starter-langgraph") os.environ.setdefault("VERIFIEDX_SOURCE_SYSTEM", "starter-langgraph") for artifact_name in ("scenario_report.json", "verifiedx_diagnostics.jsonl"): artifact_path = ARTIFACTS / artifact_name if artifact_path.exists(): artifact_path.unlink() vx = init_verifiedx() install_langgraph(verifiedx=vx) graph = build_graph() scenario_results = [ run_scenario(graph, name="safe_memory_write"), run_scenario(graph, name="blocked_graph_update"), ] output = { "state": STATE, "scenario_results": scenario_results, "diagnostics": summarize_debug_records(latest_debug_records()), } output_path = ARTIFACTS / "scenario_report.json" output_path.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(output, indent=2)) if __name__ == "__main__": main()