This guide walks you through building a realistic Zero-Trust network simulation. We’ll model a micro-segmented environment using a directed graph and ensure every request undergoes continuous verification. We create a dynamic policy engine that combines ABAC-style permissions with device posture checks, MFA, path reachability, zone sensitivity, and real-time risk signals such as anomaly detection and volume-based indicators. Finally, we deploy this model via a Flask API and simulate mixed traffic, including insider lateral movement and data exfiltration attempts, to demonstrate how trust scoring, adaptive controls, and automated quarantines block malicious flows in real time.
!pip -q install networkx flask
import math
import json
import time
import random
import hashlib
from dataclasses import dataclass, field
from typing import Dict, Any, List, Tuple, Optional
import networkx as nx
from flask import Flask, request, jsonify
import matplotlib.pyplot as plt
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
def _clamp(x: float, lo: float = 0.0, hi: float = 1.0) -> float:
return max(lo, min(hi, x))
def _now_ts() -> float:
return time.time()
def _stable_hash(s: str) -> int:
h = hashlib.sha256(s.encode("utf-8")).hexdigest()
return int(h[:10], 16)
def _rand_choice_weighted(items: List[Any], weights: List[float]) -> Any:
return random.choices(items, weights=weights, k=1)[0]
def _pretty(obj: Any) -> str:
return json.dumps(obj, indent=2, sort_keys=False)We begin by installing the necessary libraries and importing all dependencies required for graph modeling, risk scoring, and API handling. We define utility functions for trust normalization, hashing, timestamping, and weighted sampling to support deterministic simulations. We also prepare helper functions that simplify logging and structured output formatting throughout the tutorial.
ZONES = ["public", "dmz", "app", "data", "admin"]
SENSITIVITY = {"public": 0.15, "dmz": 0.35, "app": 0.6, "data": 0.85, "admin": 0.95}
ASSETS = {
"public": ["cdn", "landing", "status"],
"dmz": ["api_gateway", "waf", "vpn"],
"app": ["orders_svc", "billing_svc", "ml_inference", "inventory_svc"],
"data": ["customer_db", "ledger_db", "feature_store"],
"admin": ["iam", "siem", "backup_vault"]
}
ACTIONS = ["read", "write", "deploy", "admin", "exfiltrate"]
ROLES = ["customer", "employee", "analyst", "engineer", "admin", "secops"]
DEVICE_TYPES = ["managed_laptop", "managed_server", "byod_phone", "unknown_iot"]
NETWORK_CONTEXT = ["corp_lan", "corp_vpn", "public_wifi", "tor_exit"]
@dataclass
class RequestContext:
user: str
role: str
device_id: str
device_type: str
device_posture: float
mfa: bool
source: str
src_node: str
dst_node: str
action: str
time_bucket: str
geo_risk: float
behavior_anomaly: float
data_volume: float
reason: str = ""
@dataclass
class Decision:
allowed: bool
trust_score: float
rule_hits: List[str] = field(default_factory=list)
controls: Dict[str, Any] = field(default_factory=dict)
explanation: str = ""
ts: float = field(default_factory=_now_ts)
@dataclass
class PrincipalState:
user: str
role: str
base_risk: float
last_seen_ts: float
rolling_denies: int = 0
rolling_allows: int = 0
quarantined: bool = False
compromise_score: float = 0.0
@dataclass
class DeviceState:
device_id: str
device_type: str
owner: str
posture: float
attested: bool
quarantined: bool = False
@dataclass
class FlowRecord:
ts: float
ctx: Dict[str, Any]
decision: Dict[str, Any]We define the core domain schema, including zones, assets, roles, device types, and contextual signals that shape our Zero-Trust environment. We formalize request, decision, principal, device, and flow record structures using dataclasses to maintain clarity and state integrity. This foundational data model enables continuous trust evaluation across identities, devices, and network paths.
def build_microsegmented_graph(seed: int = 7) -> nx.DiGraph:
random.seed(seed)
G = nx.DiGraph()
for z in ZONES:
G.add_node(f"zone:{z}", kind="zone", zone=z, sensitivity=SENSITIVITY[z])
for z, assets in ASSETS.items():
for a in assets:
node = f"{z}:{a}"
G.add_node(node, kind="asset", zone=z, sensitivity=SENSITIVITY[z] + random.uniform(-0.05, 0.05))
G.add_edge(f"zone:{z}", node, kind="contains")
allowed_paths = [
("public", "dmz"),
("dmz", "app"),
("app", "data"),
("admin", "app"),
("admin", "data"),
("admin", "dmz"),
("dmz", "admin")
]
for src_z, dst_z in allowed_paths:
G.add_edge(f"zone:{src_z}", f"zone:{dst_z}", kind="zone_route", base_allowed=True)
for src_z, dst_z in allowed_paths:
for src_a in ASSETS[src_z]:
for dst_a in ASSETS[dst_z]:
if random.random() < 0.45:
G.add_edge(f"{src_z}:{src_a}", f"{dst_z}:{dst_a}", kind="service_call", base_allowed=True)
for z in ZONES:
for a in ASSETS[z]:
if random.random() < 0.35:
G.add_edge(f"{z}:{a}", f"{z}:{a}", kind="self", base_allowed=True)
return G
def draw_graph(G: nx.DiGraph, title: str = "Zero-Trust Microsegmented Network Graph") -> None:
plt.figure(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, k=0.35)
kinds = nx.get_node_attributes(G, "kind")
node_colors = []
for n in G.nodes():
if kinds.get(n) == "zone":
node_colors.append(0.85)
else:
node_colors.append(G.nodes[n].get("sensitivity", 0.5))
nx.draw_networkx_nodes(G, pos, node_size=350, node_color=node_colors)
nx.draw_networkx_edges(G, pos, arrows=True, alpha=0.25)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title(title)
plt.axis("off")
plt.show()We construct a micro-segmented directed network graph where zones
Both assets and network zones are defined with explicit sensitivity levels. We automatically create inter-zone and service-to-service data paths to mirror real enterprise traffic flows. The full network topology is rendered visually, so you can easily identify segmentation boundaries and detect possible lateral movement paths.
class ZeroTrustPolicyEngine:
def __init__(self, G: nx.DiGraph):
self.G = G
self.principals: Dict[str, PrincipalState] = {}
self.devices: Dict[str, DeviceState] = {}
self.flow_log: List[FlowRecord] = []
self.blocked_edges: set = set()
self.policy_version = "ztpe-v1.3"
self.role_perms = {
"customer": {"public": {"read"}, "dmz": {"read"}},
"employee": {"public": {"read"}, "dmz": {"read"}, "app": {"read", "write"}},
"analyst": {"public": {"read"}, "dmz": {"read"}, "app": {"read"}, "data": {"read"}},
"engineer": {"public": {"read"}, "dmz": {"read"}, "app": {"read", "write", "deploy"}, "data": {"read"}},
"admin": {"public": {"read"}, "dmz": {"read", "write"}, "app": {"read", "write", "deploy", "admin"}, "data": {"read", "write", "admin"}, "admin": {"read", "write", "admin"}},
"secops": {"public": {"read"}, "dmz": {"read", "write"}, "app": {"read", "admin"}, "data": {"read", "admin"}, "admin": {"read", "admin"}},
}
self.w = {
"role_fit": 1.4,
"device_posture": 1.8,
"mfa": 1.0,
"network_context": 1.2,
"time": 0.6,
"geo_risk": 1.2,
"behavior_anomaly": 2.2,
"data_volume": 1.4,
"principal_base_risk": 1.3,
"principal_compromise": 2.0,
"asset_sensitivity": 1.6,
"path_validity": 1.5,
"quarantine": 4.0,
}
self.thresholds = {
"allow": 0.72,
"step_up": 0.62,
"rate_limit": 0.55,
"deny": 0.0
}
def register_principal(self, user: str, role: str, base_risk: float) -> None:
self.principals[user] = PrincipalState(
user=user,
role=role,
base_risk=_clamp(base_risk),
last_seen_ts=_now_ts()
)
def register_device(self, device_id: str, device_type: str, owner: str, posture: float, attested: bool) -> None:
self.devices[device_id] = DeviceState(
device_id=device_id,
device_type=device_type,
owner=owner,
posture=_clamp(posture),
attested=bool(attested)
)
def _asset_zone_and_sensitivity(self, node: str) -> Tuple[str, float]:
if node.startswith("zone:"):
z = node.split(":", 1)[1]
return z, SENSITIVITY.get(z, 0.5)
z = self.G.nodes[node].get("zone", "public")
sens = float(self.G.nodes[node].get("sensitivity", SENSITIVITY.get(z, 0.5)))
return z, _clamp(sens)
def _base_abac_check(self, role: str, dst_zone: str, action: str) -> bool:
return action in self.role_perms.get(role, {}).get(dst_zone, set())
def _path_is_valid(self, src: str, dst: str) -> bool:
if (src, dst) in self.blocked_edges:
return False
try:
return nx.has_path(self.G, src, dst)
except nx.NetworkXError:
return False
def _network_context_risk(self, source: str) -> float:
table = {"corp_lan": 0.1, "corp_vpn": 0.25, "public_wifi": 0.65, "tor_exit": 0.9}
return table.get(source, 0.6)
def _time_risk(self, time_bucket: str) -> float:
return 0.15 if time_bucket == "business_hours" else 0.55
def _compute_trust_score(self, ctx: RequestContext) -> Tuple[float, List[str], Dict[str, Any]]:
rule_hits = []
controls: Dict[str, Any] = {}
principal = self.principals.get(ctx.user)
device = self.devices.get(ctx.device_id)
if principal is None:
rule_hits.append("unknown_principal")
principal = PrincipalState(ctx.user, ctx.role, base_risk=0.85, last_seen_ts=_now_ts())
if device is None:
rule_hits.append("unknown_device")
device = DeviceState(ctx.device_id, ctx.device_type, owner=ctx.user, posture=0.25, attested=False)
src_zone, src_sens = self._asset_zone_and_sensitivity(ctx.src_node)
dst_zone, dst_sens = self._asset_zone_and_sensitivity(ctx.dst_node)
abac_ok = self._base_abac_check(ctx.role, dst_zone, ctx.action)
if not abac_ok:
rule_hits.append("abac_denied")
path_ok = self._path_is_valid(ctx.src_node, ctx.dst_node)
if not path_ok:
rule_hits.append("invalid_path_or_blocked")
if principal.quarantined or device.quarantined:
rule_hits.append("quarantined")
controls["auto_response"] = "deny_quarantine"
if ctx.action == "exfiltrate":
rule_hits.append("exfil_attempt")
if dst_zone in ["admin", "data"] and not ctx.mfa:
rule_hits.append("mfa_required_for_sensitive_zone")
controls["step_up_mfa"] = True
if device.owner != ctx.user:
rule_hits.append("device_owner_mismatch")
net_r = self._network_context_risk(ctx.source)
t_r = self._time_risk(ctx.time_bucket)
role_fit = 1.0 if abac_ok else 0.0
posture = _clamp(device.posture if device.attested else device.posture * 0.75)
mfa = 1.0 if ctx.mfa else 0.0
path_valid = 1.0 if path_ok else 0.0
sens = _clamp(dst_sens)
principal_risk = _clamp(principal.base_risk)
compromise = _clamp(principal.compromise_score)
anomaly = _clamp(ctx.behavior_anomaly)
geo = _clamp(ctx.geo_risk)
data_vol = _clamp(ctx.data_volume)
quarantine_penalty = 1.0 if (principal.quarantined or device.quarantined) else 0.0
owner_mismatch_penalty = 1.0 if (device.owner != ctx.user) else 0.0
exfil_penalty = 1.0 if (ctx.action == "exfiltrate") else 0.0
z = 0.0
z += self.w["role_fit"] * (role_fit - 0.5)
z += self.w["device_posture"] * (posture - 0.5)
z += self.w["mfa"] * (mfa - 0.5)
z += self.w["path_validity"] * (path_valid - 0.5)
z -= self.w["asset_sensitivity"] * (sens - 0.35)
z -= self.w["network_context"] * (net_r - 0.25)
z -= self.w["time"] * (t_r - 0.15)
z -= self.w["geo_risk"] * (geo - 0.2)
z -= self.w["behavior_anomaly"] * (anomaly - 0.1)
z -= self.w["data_volume"] * (data_vol - 0.15)
z -= self.w["principal_base_risk"] * (principal_risk - 0.2)
z -= self.w["principal_compromise"] * (compromise - 0.0)
z -= 2.0 * owner_mismatch_penalty
z -= 2.5 * exfil_penalty
z -= self.w["quarantine"] * quarantine_penalty
trust = _sigmoid(z)
if trust < self.thresholds["rate_limit"]:
controls["rate_limit"] = True
if trust < self.thresholds["step_up"]:
controls["step_up"] = bool(controls.get("step_up_mfa", False) or dst_zone in ["admin", "data"])
if trust < self.thresholds["allow"]:
controls["continuous_auth"] = True
if "abac_denied" in rule_hits or "invalid_path_or_blocked" in rule_hits or "exfil_attempt" in rule_hits:
controls["risk_signal"] = "policy_violation"
if anomaly > 0.75 and sens# Dynamic Zero-Trust Policy Engine Implementation
The following paraphrased version presents a clearer, more readable explanation while maintaining all functional content. The HTML structure is preserved exactly as provided.
A dynamic Zero-Trust Policy Engine is implemented to evaluate every incoming request based on Attribute-Based Access Control (ABAC), contextual- Anomalous behavioral indicators are incorporated, along with path validation logic. A continuous trust score is calculated through a weighted risk model, which then triggers adaptive security actions—such as step-up authentication challenges, rate limiting responses, and quarantine measures. After each decision is rendered, both the principal and device state are updated to reflect ongoing verification and an evolving risk profile.
The code uses a Python structure with a `ZeroTrustPolicyEngine` class containing multiple internal methods:
- `_compute_trust_score` calculates an aggregate trust value based on various risk factors.
- `evaluate` applies thresholds and additional adaptive controls before rendering a final decision.
- `_explain` constructs a human-readable explanation string for auditing or logging purposes.
- `_post_decision_updates` adjusts principal and device risk scores post-decision, triggering quarantines when necessary.
- `stats` compiles and returns aggregated operational metrics including allow/deny counts, top deny reasons, and entity states.
Helper functions like `make_world` set up a simulation environment with sample users, devices, and assets. Meanwhile, `gen_request` generates synthetic request contexts for testing or demonstration purposes under controlled conditions.```python
import random
from typing import Tuple, Dict, Any
from dataclasses import dataclass
from flask import Flask, request, jsonify
# Placeholder classes for engine, world, etc.
# (Assuming these exist or are defined elsewhere)
engine = ZeroTrustPolicyEngine(G)
world = make_world(engine, seed=13)
client = Flask(__name__)
# Helper functions and data structures assumed to exist:
# - NETWORK_CONTEXT, ACTIONS, by_zone, pick_user, user_device
# - _rand_choice_weighted, _clamp
# - RequestContext, ZeroTrustPolicyEngine
def _rand_choice_weighted(choices, weights):
return random.choices(choices, weights=weights)[0]
def _clamp(value, min_val=0, max_val=1):
return max(min_val, min(max_val, value))
NETWORK_CONTEXT = ["corp_vpn", "public_wifi", "tor_exit", "unknown"]
ACTIONS = ["read", "write", "admin", "exfiltrate", "monitor"]
by_zone = {
"public": ["pub-1", "pub-2", "pub-3"],
"dmz": ["dmz-1", "dmz-2", "dmz-3"],
"app": ["app-1", "app-2", "app-3"],
"data": ["data-1", "data-2", "data-3"],
"admin": ["admin-1", "admin-2"]
}
users = [
("alice", "employee"),
("bob", "employee"),
("eve", "admin"),
("mallory", "customer"),
("dave", "contractor")
]
rnd = random.Random()
rnd.seed(99)
def pick_user(seed_salt=None):
if seed_salt:
rnd.seed(hash(seed_salt))
return rnd.choice(users)
def user_device(username):
candidates = [d for d in engine.devices.values() if d.username == username]
if candidates:
d = rnd.choice(candidates)
else:
d = rnd.choice(list(engine.devices.values()))
return d.device_id, d.device_type, d.posture
def get_time_period():
return "business_hours" if rnd.random() < 0.75 else "after_hours"
def generate_request(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], kind="normal", seed_salt=None) -> RequestContext:
if seed_salt:
rnd.seed(hash(seed_salt))
source = _rand_choice_weighted(NETWORK_CONTEXT, [0.45, 0.25, 0.22, 0.08])
geo_risk = rnd.uniform(0.05, 0.35) + (0.25 if source in ["public_wifi", "tor_exit"] else 0.0)
behavior_anomaly = rnd.uniform(0.02, 0.25)
data_volume = rnd.uniform(0.02, 0.25)
geo_risk = _clamp(geo_risk)
behavior_anomaly = _clamp(behavior_anomaly)
data_volume = _clamp(data_volume)
if kind == "normal":
username, role = pick_user(seed_salt)
device_id, device_type, posture = user_device(username)
src_zone_weights = [0.15, 0.55, 0.30]
dst_zone_weights = [0.35, 0.45, 0.20]
action_weights = [0.55, 0.28, 0.07, 0.08, 0.02]
src_zone = _rand_choice_weighted(["public", "dmz", "app"], src_zone_weights)
dst_zone = _rand_choice_weighted(["dmz", "app", "data"], dst_zone_weights)
action = _rand_choice_weighted(ACTIONS, action_weights)
src = rnd.choice(by_zone[src_zone])
dst = rnd.choice(by_zone[dst_zone])
mfa_required = dst_zone in ["data", "admin"]
mfa_passed = mfa_required or random.random() < 0.55
return RequestContext(
user=username, role=role,
device_id=device_id, device_type=device_type, device_posture=posture,
mfa=mfa_passed, source=source,
src_node=src, dst_node=dst,
action=action,
time_bucket=get_time_period(),
geo_risk=geo_risk,
behavior_anomaly=behavior_anomaly,
data_volume=data_volume,
reason="routine_access"
)
elif kind == "malicious_flow":
username, role = ("unknown_actor", "customer")
device_id, device_type, posture = ("unknown-dev", "unknown_iot", 0.18)
source = _rand_choice_weighted(["tor_exit", "public_wifi"], [0.65, 0.35])
geo_risk = _clamp(rnd.uniform(0.6, 0.95))
behavior_anomaly = _clamp(rnd.uniform(0.75, 0.98))
data_volume = _clamp(rnd.uniform(0.75, 0.98))
src = rnd.choice(by_zone["public"] + by_zone["dmz"])
dst = rnd.choice(by_zone["data"] + by_zone["admin"])
action = _rand_choice_weighted(["write", "admin", "exfiltrate"], [0.25, 0.25, 0.50])
mfa_passed = False
return RequestContext(
user=username, role=role,
device_id=device_id, device_type=device_type, device_posture=posture,
mfa=mfa_passed, source=source,
src_node=src, dst_node=dst,
action=action,
time_bucket="after_hours",
geo_risk=geo_risk,
behavior_anomaly=behavior_anomaly,
data_volume=data_volume,
reason="external_malicious_attempt"
)
elif kind == "insider_threat":
username, role = ("mallory", "employee")
device_id, device_type, posture = user_device(username)
source = _rand_choice_weighted(["corp_vpn", "public_wifi"], [0.55, 0.45])
geo_risk = _clamp(rnd.uniform(0.25, 0.65))
behavior_anomaly = _clamp(rnd.uniform(0.55, 0.95))
data_volume = _clamp(rnd.uniform(0.55, 0.95))
src = rnd.choice(by_zone["app"] + by_zone["dmz"])
dst = rnd.choice(by_zone["data"] + by_zone["admin"])
action = _rand_choice_weighted(["read", "write", "exfiltrate", "admin"], [0.18, 0.22, 0.45, 0.15])
mfa_passed = random.random() < 0.25
return RequestContext(
user=username, role=role,
device_id=device_id, device_type=device_type, device_posture=posture,
mfa=mfa_passed, source=source,
src_node=src, dst_node=dst,
action=action,
time_bucket="after_hours",
geo_risk=geo_risk,
behavior_anomaly=behavior_anomaly,
data_volume=data_volume,
reason="insider_lateral_and_exfil"
)
raise ValueError(f"Unknown request type: {kind}")
def simulate_traffic(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], steps: int = 60, seed: int = 99) -> Dict[str, Any]:
random.seed(seed)
total_results = {"allowed": 0, "denied": 0, "samples": []}
for i in range(steps):
if i in [12, 13, 14, 28, 29]:
gen_kind = "malicious_flow"
elif i in [18, 19, 20, 34, 35, 36, 50, 51]:
gen_kind = "insider_threat"
else:
gen_kind = "normal"
req_context = generate_request(engine, world, kind=gen_kind, seed_salt=str(i))
decision = engine.evaluate(req_context)
if decision.allowed:
total_results["allowed"] += 1
else:
total_results["denied"] += 1
# Save early examples and recent blocks
if i < 10 or (not decision.allowed and len(total_results["samples"]) < 18):
total_results["samples"].append({
"ctx": req_context.__dict__,
"decision": decision.__dict__
})
return total_results
def create_api_app(engine: ZeroTrustPolicyEngine, world: Dict[str, Any]) -> Flask:
app = Flask(__name__)
@app.get("/health")
def check_health():
return jsonify({"ok": True, "policy_version": engine.policy_version})
@app.get("/graph")
def get_network_graph():
nodes = [{"id": n, **engine.G.nodes[n]} for n in engine.G.nodes()]
edges = [{"src": u, "dst": v, **engine.G.edges[u, v]} for u, v in engine.G.edges()]
return jsonify({
"nodes": nodes,
"edges": edges,
"blocked_edges": list(map(list, engine.blocked_edges))
})
@app.post("/request")
def handle_request():
payload = request.get_json(force=True)
ctx = RequestContext(**payload)
decision = engine.evaluate(ctx)
return jsonify({
"allowed": decision.allowed,
"trust_score": decision.trust_score,
"rule_hits": decision.rule_hits,
"controls": decision.controls,
"explanation": decision.explanation
})
@app.post("/simulate")
def run_traffic_simulation():
payload = request.get_json(force=True) if request.data else {}
steps = int(payload.get("steps", 50))
seed = int(payload.get("seed", 123))
results = simulate_traffic(engine, world, steps=steps, seed=seed)
return jsonify({
"steps": steps,
"allowed": results["allowed"],
"denied": results["denied"],
"stats": engine.stats()
})
@app.get("/stats")
def get_statistics():
return jsonify(engine.stats())
return app
# Example usage
G = build_microsegmented_graph(seed=7)
engine = ZeroTrustPolicyEngine(G)
world = make_world(engine, seed=13)
draw_graph(G, title="Zero-Trust Microsegmented Network (Zones + Assets + Directed Flows)")
app = create_api_app(engine, world)
client = app.test_client()
print("== Health ==")
print(client.get("/health").json)
print("n== Run simulation (mixture: normal + malicious flows + insider threat) ==")
sim_out = client.post("/simulate", json={"steps": 70, "seed": 2026}).json
print(_pretty({
"allowed": sim_out["allowed"],
"denied": sim_out["denied"],
"blocked_edges_count": sim_out["stats"]["blocked_edges_count"]
}))
print("n== Top deny reasons ==")
print(_pretty(sim_out["stats"]["deny_reasons_top"]))
print("n== Principal risk snapshot (watch mallory) ==")
principals = sim_out["stats"]["principals"]
focus = {k: principals[k] for k in principals if k == "mallory"}
print(_pretty(focus))
```sorted(principals.keys()) if k in ["alice","bob","cathy","dan","eve","mallory","unknown_actor"]}
print(_pretty(focus))
print("n== Example: send a direct insider exfil request via the policy API ==")
insider_ctx = gen_request(engine, world, kind="insider_threat", seed_salt="manual-1")
insider_ctx.action = "exfiltrate"
insider_ctx.mfa = False
insider_ctx.behavior_anomaly = 0.92
insider_ctx.data_volume = 0.88
insider_ctx.geo_risk = 0.62
resp = client.post("/request", json=insider_ctx.__dict__).json
print(_pretty(resp))
print("n== Example: a legitimate admin read with MFA from corp_lan ==")
admin_ctx = RequestContext(
user="dan", role="admin",
device_id="dev-dan-lt", device_type="managed_laptop", device_posture=engine.devices["dev-dan-lt"].posture,
mfa=True, source="corp_lan",
src_node=random.choice(world["by_zone"]["admin"]),
dst_node=random.choice(world["by_zone"]["data"]),
action="read",
time_bucket="business_hours",
geo_risk=0.08,
behavior_anomaly=0.06,
data_volume=0.10,
reason="admin_operational_access"
)
resp2 = client.post("/request", json=admin_ctx.__dict__).json
print(_pretty(resp2))
print("n== Final stats ==")
final_stats = client.get("/stats").json
print(_pretty({
"flows_total": final_stats["flows_total"],
"flows_allow": final_stats["flows_allow"],
"flows_deny": final_stats["flows_deny"],
"blocked_edges_count": final_stats["blocked_edges_count"],
"deny_reasons_top": final_stats["deny_reasons_top"]
}))
scores = [r.decision["trust_score"] for r in engine.flow_log]
plt.figure(figsize=(9, 4))
plt.hist(scores, bins=18)
plt.title("Trust Score Distribution Across Simulated Flows")
plt.xlabel("trust_score")
plt.ylabel("count")
plt.show()
denied = [r for r in engine.flow_log if not r.decision["allowed"]]
print("n== Recent denied explanations (last 6) ==")
for r in denied[-6:]:
print("-", r.decision["explanation"])
We expose the policy engine through a Flask API and interact with it using a test client to keep the notebook self-contained. We run simulations, inspect trust distributions, analyze denial reasons, and observe quarantine and edge-blocking behavior. We conclude by visualizing trust score patterns and examining denied explanations to validate the Zero-Trust enforcement logic in action.
In conclusion, we demonstrated how Zero Trust becomes a measurable, programmable system when identity, device state, network context, and behavior signals are evaluated together for every interaction. We saw the policy engine deny or step up risky requests, rate-limit low-trust activity, and dynamically block abusive edges to prevent repeated lateral movement and data theft. By combining graph-based segmentation with an evolving trust score and automated responses, we ended with a repeatable framework that we can extend with richer telemetry, better anomaly models, and environment-specific policies while keeping the core “never trust, always verify” loop intact.
Check out the Full Codes with Notebook. Also, feel free to follow us on Twitter and don't forget to join our 150k+ ML SubReddit and subscribe to our Newsletter. Wait! Are you on Telegram? Now you can join us on Telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection appeared first on MarkTechPost.



