scenarios = [
{
"name": "Safe database read",
"tool": research_db,
"kwargs": {
"table": "customers",
"operation": "select",
"type": "select",
"sensitivity": "medium"
}
},
{
"name": "Blocked destructive database action",
"tool": research_db,
"kwargs": {
"table": "customers",
"operation": "drop",
"type": "drop_table",
"sensitivity": "critical"
}
},
{
"name": "External correspondence requiring approval",
"tool": research_email,
"kwargs": {
"to": "[email protected]",
"recipient_domain": "example.com",
"subject": "Quarterly highlights",
"body": "Sharing a non-sensitive quarterly summary.",
"type": "send_email",
"sensitivity": "medium"
}
},
{
"name": "External message denied as approval was declined",
"tool": research_email,
"kwargs": {
"to": "[email protected]",
"recipient_domain": "example.com",
"subject": "Confidential strategy",
"body": "This contains private strategic details.",
"type": "send_email",
"sensitivity": "critical"
}
},
{
"name": "Secure sandbox shell task",
"tool": ops_shell,
"kwargs": {
"command": "echo Agent governance is active",
"type": "shell_exec",
"sensitivity": "low"
}
},
{
"name": "Risky shell command prevented",
"tool": ops_shell,
"kwargs": {
"command": "rm -rf /content/something",
"type": "shell_exec",
"sensitivity": "critical"
}
},
{
"name": "Untrusted agent barred from sensitive information",
"tool": shadow_db,
"kwargs": {
"table": "executive_compensation",
"operation": "select",
"type": "select",
"sensitivity": "critical"
}
},
{
"name": "Monetary transfer needing clearance",
"tool": finance_transfer,
"kwargs": {
"amount": 2500,
"destination": "vendor-123",
"type": "transfer_money",
"sensitivity": "high"
}
},
{
"name": "Substantial monetary transfer dismissed",
"tool": finance_transfer,
"kwargs": {
"amount": 15000,
"destination": "vendor-999",
"type": "transfer_money",
"sensitivity": "critical"
}
},
]
results = []
for scenario in scenarios:
try:
response = scenario["tool"](**scenario["kwargs"])
results.append({
"scenario": scenario["name"],
"status": "completed",
"output": response
})
except Exception as error:
results.append({
"scenario": scenario["name"],
"status": "blocked_or_in_review",
"error": str(error)
})
audit_dataframe = audit_log.to_dataframe()
columns_to_show = [
"timestamp",
"agent_name",
"tool_name",
"decision",
"matched_rule",
"severity",
"reason",
"record_hash"
]
display(audit_dataframe[columns_to_show])
test_cases = [
{
"name": "drop_table must be prevented",
"identity": research_agent,
"tool_name": "query_database",
"action": {"type": "drop_table", "sensitivity": "critical", "autonomous": True},
"expected": "deny"
},
{
"name": "Basic select permitted",
"identity": research_agent,
"tool_name": "query_database",
"action": {"type": "select", "sensitivity": "low", "autonomous": True},
"expected": "allow"
},
{
"name": "External email should require approval",
"identity": research_agent,
"tool_name": "send_email",
"action": {
"type": "send_email",
"recipient_domain": "example.com",
"sensitivity": "medium",
"autonomous": True
},
"expected": "require_approval"
},
{
"name": "Low trust limited data access",
"identity": unknown_agent,
"tool_name": "query_database",
"action": {"type": "select", "sensitivity": "critical", "autonomous": True},
"expected": "deny"
},
{
"name": "Terminal command needs isolation",
"identity": ops_agent,
"tool_name": "shell_exec",
"action": {
"type": "shell_exec",
"command": "echo hello",
"sensitivity": "low",
"autonomous": True
},
"expected": "sandbox"
},
]
test_outcomes = []
for case in test_cases:
result = engine.evaluate(
identity=case["identity"],
tool_name=case["tool_name"],
action=case["action"]
)
success = result.decision == case["expected"]
test_outcomes.append({
"test": case["name"],
"expected": case["expected"],
"actual": result.decision,
"passed": success,
"matched_rule": result.matched_rule
})
results_table = pd.DataFrame(test_outcomes)
display(results_table)
engine.activate_kill_switch()
try:
research_db(
table="customers",
operation="select",
type="select",
sensitivity="low"
)
except Exception as error:
pass
engine.deactivate_kill_switch()
audit_dataframe = audit_log.to_dataframe()
summary_stats = (
audit_dataframe
.groupby(["decision", "severity"], dropna=False)
.size()
.reset_index(name="total")
.sort_values("total", ascending=False)
)
display(summary_stats)
agent_analysis = (
audit_dataframe
.groupby(["agent_name", "decision"])
.size()
.reset_index(name="total")
.sort_values(["agent_name", "total"], ascending=[True, False])
)
display(agent_analysis)
decision_breakdown = audit_dataframe["decision"].value_counts()
plt.figure(figsize=(8, 5))
decision_breakdown.plot(kind="bar")
plt.title("Governance Verdicts throughout All Agent Tasks")
plt.xlabel("Decision Type")
plt.ylabel("Frequency")
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
severity_split = audit_dataframe["severity"].fillna("none").value_counts()
plt.figure(figsize=(8, 5))
severity_split.plot(kind="bar")
plt.title("Governance Occurrences by Urgency")
plt.xlabel("Severity Level")
plt.ylabel("Occurrence")
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
G = nx.DiGraph()
for _, entry in audit_dataframe.iterrows():
agent_label = f"Agent: {entry['agent_name']}"
tool_label = f"Tool: {entry['tool_name']}"
decision_label = f"Decision: {entry['decision']}"
rule_label = f"Rule: {entry['matched_rule']}" if pd.notna(entry["matched_rule"]) else "No matched rule"
G.add_node(agent_label, node_type="agent")
G.add_node(tool_label, node_type="tool")
G.add_node(decision_label, node_type="decision")
G.add_node(rule_label, node_type="rule")
G.add_edge(agent_label, tool_label, relation="calls")
G.add_edge(tool_label, decision_label, relation="produces")
G.add_edge(decision_label, rule_label, relation="matched")
plt.figure(figsize=(14, 9))
coord_map = nx.spring_layout(G, seed=42, k=0.8)
nx.draw_networkx_nodes(G, coord_map, node_size=1800)
nx.draw_networkx_edges(G, coord_map, arrows=True, arrowstyle="->", arrowsize=15)
nx.draw_networkx_labels(G, coord_map, font_size=8)
plt.title("Agent Governance Mapping: Participants, Tools, Verdicts, and Policy Rules")
plt.axis("off")
plt.tight_layout()
plt.show()
SAVE_PATH = "/content/agt_tutorial_outputs"
os.makedirs(SAVE_PATH, exist_ok=True)
audit_json_file = os.path.join(SAVE_PATH, "tamper_evident_audit_log.json")
audit_csv_file = os.path.join(SAVE_PATH, "governance_audit_log.csv")
policy_backup_file = os.path.join(SAVE_PATH, "advanced_agent_policy.yaml")
test_file = os.path.join(SAVE_PATH, "policy_test_results.csv")
with open(audit_json_file, "w") as json_out:
json.dump([asdict(record) for record in audit_log.records], json_out, indent=2, default=str)
audit_dataframe.to_csv(audit_csv_file, index=False)
results_table.to_csv(test_file, index=False)
shutil.copy(POLICY_PATH, policy_backup_file)Subscribe to Updates
Get the latest tech insights from TechnologiesDigest.com on AI, innovation, and the future of digital technology.
Trending
- **Debunking the Myths: The Truth Behind Federal Retirement Benefits
- Dell XPS 13 (2026) vs MacBook Neo: A Budget Showdown — My Honest Pick Is Here
- “Trump’s Immigration Crackdown: Hidden Catalyst for Stablecoin Boom, Bitcoin ATMs, and the New Digital Dollar Surge”
- Rerankers Aren’t Magic Either: When the Cross-Encoder Layer Is Worth the Cost
- StepFun Unveils Step 3.7 Flash: The 198B MoE VL Model Built for Coding Agents and Search Automation
- The Golden Escape: How to Lock In Tax-Free Gains by Turning TSP Profits Into Hard Assets
- 9 Google Search Alternatives That Actually Show Real Links Again
- Secure and Govern AI Agents with Microsoft Agent Governance Toolkit: Policies, Approvals, Audit Logs, and Risk Controls



