On this tutorial, we construct a sophisticated multi-agent communication system utilizing a structured message bus structure powered by LangGraph and Pydantic. We outline a strict ACP-style message schema that permits brokers to speak by way of a shared state reasonably than calling one another straight, enabling modularity, traceability, and production-grade orchestration. We implement three specialised brokers, a Planner, Executor, and Validator, that coordinate by way of structured messages, persistent state, and routing logic. We additionally combine SQLite-based persistence to offer sturdy reminiscence throughout executions and visualize the agent communication stream to know how messages propagate by way of the system.
!pip -q set up -U "pydantic==2.12.3"
!pip -q set up -U langgraph langchain-core networkx matplotlib
!pip -q set up -U langgraph-checkpoint-sqlite
import os
import json
import uuid
import sqlite3
from datetime import datetime, timezone
from typing import Any, Dict, Record, Literal, Non-obligatory, Tuple
from pydantic import BaseModel, Discipline
import networkx as nx
import matplotlib.pyplot as plt
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
Function = Literal["planner", "executor", "validator", "user", "system"]
MsgType = Literal["task", "plan", "result", "validation", "error", "control"]
class ACPMessage(BaseModel):
msg_id: str = Discipline(default_factory=lambda: str(uuid.uuid4()))
ts: str = Discipline(default_factory=lambda: datetime.now(timezone.utc).isoformat().change("+00:00", "Z"))
sender: Function
receiver: Function
msg_type: MsgType
content material: str
meta: Dict[str, Any] = Discipline(default_factory=dict)
hint: Dict[str, Any] = Discipline(default_factory=dict)
def acp_log_path() -> str:
os.makedirs("acp_logs", exist_ok=True)
return os.path.be part of("acp_logs", "acp_messages.jsonl")
def append_acp_log(m: ACPMessage) -> None:
with open(acp_log_path(), "a", encoding="utf-8") as f:
f.write(m.model_dump_json() + "n")We set up and import all of the required libraries wanted to construct a structured multi-agent communication system. We outline the ACP-style message schema utilizing Pydantic, which permits us to implement a strict and structured format for agent communication. We additionally implement structured logging to persist each message exchanged between brokers, enabling traceability and observability of the system.
class BusState(BaseModel):
objective: str = ""
performed: bool = False
errors: Record[str] = Discipline(default_factory=record)
mailbox: Record[ACPMessage] = Discipline(default_factory=record)
edges: Record[Tuple[str, str, str]] = Discipline(default_factory=record)
active_role: Function = "user"
step: int = 0
def bus_update(
state: BusState,
sender: Function,
receiver: Function,
msg_type: MsgType,
content material: str,
meta: Non-obligatory[Dict[str, Any]] = None,
hint: Non-obligatory[Dict[str, Any]] = None,
) -> Dict[str, Any]:
m = ACPMessage(
sender=sender,
receiver=receiver,
msg_type=msg_type,
content material=content material,
meta=meta or {},
hint=hint or {},
)
append_acp_log(m)
return {
"goal": state.objective,
"done": state.performed,
"errors": state.errors,
"mailbox": state.mailbox + [m],
"edges": state.edges + [(sender, receiver, msg_type)],
"active_role": receiver,
"step": state.step + 1,
}We outline the shared state construction that acts because the centralized message bus for all brokers. We implement the BusState class to retailer the objective, mailbox, routing info, and execution progress. We additionally create the bus_update perform, which permits us to generate structured messages, replace the shared state, and constantly persist message logs.
def planner_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
state = BusState.model_validate(state_dict)
objective = state.objective.strip()
if not objective:
return bus_update(state, "planner", "validator", "error", "No goal provided.", meta={"reason": "empty_goal"})
plan = [
"Interpret the goal and extract requirements.",
"Decide an execution strategy with clear outputs.",
"Ask Executor to produce the result.",
"Ask Validator to check correctness + completeness.",
]
plan_text = "n".be part of([f"{i+1}. {p}" for i, p in enumerate(plan)])
return bus_update(
state,
"planner",
"executor",
"plan",
plan_text,
meta={"goal": objective, "plan_steps": len(plan)},
hint={"policy": "deterministic_planner_v1"},
)
def executor_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
state = BusState.model_validate(state_dict)
objective = state.objective.strip()
latest_plan = None
for m in reversed(state.mailbox):
if m.receiver == "executor" and m.msg_type == "plan":
latest_plan = m.content material
break
end result = {
"goal": objective,
"assumptions": [
"We can produce a concise, actionable output.",
"We can validate via rule-based checks.",
],
"output": f"Executed task for goal: {goal}",
"deliverables": [
"A clear summary",
"A step-by-step action list",
"Any constraints and edge cases",
],
"plan_seen": bool(latest_plan),
}
result_text = json.dumps(end result, indent=2)
return bus_update(
state,
"executor",
"validator",
"result",
result_text,
meta={"artifact_type": "json", "bytes": len(result_text.encode("utf-8"))},
hint={"policy": "deterministic_executor_v1"},
)We implement the Planner and Executor brokers, which deal with process planning and execution. We design the Planner agent to interpret the objective and generate a structured execution plan, which is then handed by way of the message bus. We implement the Executor agent to learn the plan, execute it, and produce a structured end result artifact that downstream brokers can validate.
def validator_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
state = BusState.model_validate(state_dict)
objective = state.objective.strip()
latest_result = None
for m in reversed(state.mailbox):
if m.receiver == "validator" and m.msg_type in ("result", "error"):
latest_result = m
break
if latest_result is None:
upd = bus_update(state, "validator", "planner", "error", "No result to validate.", meta={"reason": "missing_result"})
upd["done"] = True
upd["errors"] = state.errors + ["missing_result"]
return upd
if latest_result.msg_type == "error":
upd = bus_update(
state,
"validator",
"planner",
"validation",
f"Validation failed because upstream error occurred: {latest_result.content}",
meta={"status": "fail"},
)
upd["done"] = True
upd["errors"] = state.errors + [latest_result.content]
return upd
attempt:
parsed = json.hundreds(latest_result.content material)
besides Exception as e:
upd = bus_update(
state,
"validator",
"planner",
"validation",
f"Result is not valid JSON: {e}",
meta={"status": "fail"},
)
upd["done"] = True
upd["errors"] = state.errors + [f"invalid_json: {e}"]
return upd
points = []
if parsed.get("goal") != objective:
points.append("Result.goal does not match input goal.")
if "deliverables" not in parsed or not isinstance(parsed["deliverables"], record) or len(parsed["deliverables"]) == 0:
points.append("Missing or empty deliverables list.")
if points:
upd = bus_update(
state,
"validator",
"planner",
"validation",
"Validation failed:n- " + "n- ".be part of(points),
meta={"status": "fail", "issues": points},
)
upd["done"] = True
upd["errors"] = state.errors + points
return upd
upd = bus_update(
state,
"validator",
"user",
"validation",
"Validation passed ✅ Result looks consistent and complete.",
meta={"status": "pass"},
)
upd["done"] = True
upd["errors"] = state.errors
return upd
def route_next(state_dict: Dict[str, Any]) -> str:
if state_dict.get("done", False):
return END
position = state_dict.get("active_role", "user")
if position == "planner":
return "planner"
if position == "executor":
return "executor"
if position == "validator":
return "validator"
return ENDWe implement the Validator agent and the routing logic that controls agent execution stream. We design the Validator to examine the execution outcomes, confirm correctness, and generate validation outcomes by way of structured checks. We additionally implement the routing perform that dynamically determines which agent ought to execute subsequent, enabling coordinated multi-agent orchestration.
graph = StateGraph(dict)
graph.add_node("planner", planner_agent)
graph.add_node("executor", executor_agent)
graph.add_node("validator", validator_agent)
graph.set_entry_point("planner")
graph.add_conditional_edges("planner", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
graph.add_conditional_edges("executor", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
graph.add_conditional_edges("validator", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
os.makedirs("checkpoints", exist_ok=True)
db_path = "checkpoints/langgraph_bus.sqlite"
conn = sqlite3.join(db_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)
app = graph.compile(checkpointer=checkpointer)
def run_thread(objective: str, thread_id: str) -> BusState:
init = BusState(objective=objective, active_role="planner", performed=False).model_dump()
final_state_dict = app.invoke(init, config={"configurable": {"thread_id": thread_id}})
return BusState.model_validate(final_state_dict)
thread_id = "demo-thread-001"
objective = "Design an ACP-style message bus where planner/executor/validator coordinate through shared state."
final_state = run_thread(objective, thread_id)
print("Done:", final_state.performed)
print("Steps:", final_state.step)
print("Errors:", final_state.errors)
print("nLast 5 messages:")
for m in final_state.mailbox[-5:]:
print(f"- [{m.msg_type}] {m.sender} -> {m.receiver}: {m.content[:80]}")
snapshot = checkpointer.get_tuple({"configurable": {"thread_id": thread_id}})
cp = snapshot.checkpoint or {}
cv = cp.get("channel_values", {}) or {}
sv = cp.get("state", {}) or {}
vals = cv if isinstance(cv, dict) and len(cv) else sv if isinstance(sv, dict) else {}
print("nCheckpoint keys:", record(cp.keys()))
if isinstance(cv, dict):
print("channel_values keys:", record(cv.keys())[:30])
if isinstance(sv, dict):
print("state keys:", record(sv.keys())[:30])
print("nPersisted step (best-effort):", vals.get("step", "NOT_FOUND"))
print("Persisted active_role (best-effort):", vals.get("active_role", "NOT_FOUND"))
print("nACP logs:", acp_log_path())
print("Checkpoint DB:", db_path)
G = nx.DiGraph()
G.add_edge("planner", "executor")
G.add_edge("executor", "validator")
G.add_edge("validator", "user")
plt.determine(figsize=(6, 4))
pos = nx.spring_layout(G, seed=7)
nx.draw(G, pos, with_labels=True, node_size=1800, font_size=10, arrows=True)
plt.title("Orchestration Graph: Planner → Executor → Validator")
plt.present()
comm = nx.MultiDiGraph()
for (s, r, t) in final_state.edges:
comm.add_edge(s, r, label=t)
plt.determine(figsize=(8, 5))
pos2 = nx.spring_layout(comm, seed=11)
nx.draw(comm, pos2, with_labels=True, node_size=1800, font_size=10, arrows=True)
plt.title("Communication Graph from Structured Message Bus (Runtime Edges)")
plt.present()
def tail_jsonl(path: str, n: int = 8) -> Record[Dict[str, Any]]:
if not os.path.exists(path):
return []
with open(path, "r", encoding="utf-8") as f:
traces = f.readlines()[-n:]
return [json.loads(x) for x in lines]
print("nLast ACP log entries:")
for row in tail_jsonl(acp_log_path(), 6):
print(f"{row['msg_type']:>10} | {row['sender']} -> {row['receiver']} | {row['ts']}")We assemble the LangGraph state graph, allow SQLite-based persistence, and execute the multi-agent workflow. We use a thread identifier to make sure the agent state may be saved and recovered reliably throughout executions. We additionally visualize the orchestration and communication graphs and examine endured logs, which permits us to know how brokers work together by way of the structured message bus.
On this tutorial, we efficiently designed and applied a structured multi-agent communication framework utilizing LangGraph’s shared-state structure and ACP-style message-bus rules. We enabled brokers to function independently whereas speaking by way of structured, persistent messages, which improves reliability, observability, and scalability. We logged each interplay, endured agent state throughout executions, and visualized communication patterns to realize deep perception into agent coordination. This structure permits us to construct sturdy, modular, and production-ready multi-agent techniques that may be prolonged with further brokers, LLM reasoning, reminiscence techniques, and sophisticated routing methods.
Take a look at the Full Codes right here. Additionally, be happy to comply with us on Twitter and don’t neglect to hitch our 120k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be part of us on telegram as properly.



