On this tutorial, we implement an end-to-end Sensible Byzantine Fault Tolerance (PBFT) simulator utilizing asyncio. We mannequin a practical distributed community with asynchronous message passing, configurable delays, and Byzantine nodes that deliberately deviate from the protocol. By explicitly implementing the pre-prepare, put together, and commit phases, we discover how PBFT achieves consensus below adversarial situations whereas respecting the theoretical 3f+1 sure. We additionally instrument the system to measure consensus latency and success charges because the variety of malicious nodes will increase, permitting us to empirically observe the boundaries of Byzantine fault tolerance.
import asyncio
import random
import time
import hashlib
from dataclasses import dataclass, discipline
from typing import Dict, Set, Tuple, Non-compulsory, Record
import matplotlib.pyplot as plt
PREPREPARE = "PREPREPARE"
PREPARE = "PREPARE"
COMMIT = "COMMIT"
@dataclass(frozen=True)
class Msg:
typ: str
view: int
seq: int
digest: str
sender: int
@dataclass
class NetConfig:
min_delay_ms: int = 5
max_delay_ms: int = 40
drop_prob: float = 0.0
reorder_prob: float = 0.0We set up the simulator’s basis by importing the required libraries and defining the core PBFT message varieties. We formalize community messages and parameters utilizing dataclasses to make sure structured, constant communication. We additionally outline constants representing the three PBFT phases used all through the system.
class Community:
def __init__(self, cfg: NetConfig):
self.cfg = cfg
self.nodes: Dict[int, "Node"] = {}
def register(self, node: "Node"):
self.nodes[node.nid] = node
async def ship(self, dst: int, msg: Msg):
if random.random() < self.cfg.drop_prob:
return
d = random.uniform(self.cfg.min_delay_ms, self.cfg.max_delay_ms) / 1000.0
await asyncio.sleep(d)
if random.random() < self.cfg.reorder_prob:
await asyncio.sleep(random.uniform(0.0, 0.02))
await self.nodes[dst].inbox.put(msg)
async def broadcast(self, src: int, msg: Msg):
duties = []
for nid in self.nodes.keys():
duties.append(asyncio.create_task(self.ship(nid, msg)))
await asyncio.collect(*duties)We implement an asynchronous community layer that simulates real-world message supply with delays, reordering, and potential drops. We register nodes dynamically and use asyncio duties to broadcast messages throughout the simulated community. We mannequin non-deterministic communication conduct that immediately impacts consensus latency and robustness.
@dataclass
class NodeConfig:
n: int
f: int
primary_id: int = 0
view: int = 0
timeout_s: float = 2.0
class Node:
def __init__(self, nid: int, internet: Community, cfg: NodeConfig, byzantine: bool = False):
self.nid = nid
self.internet = internet
self.cfg = cfg
self.byzantine = byzantine
self.inbox: asyncio.Queue[Msg] = asyncio.Queue()
self.preprepare_seen: Dict[int, str] = {}
self.prepare_votes: Dict[Tuple[int, str], Set[int]] = {}
self.commit_votes: Dict[Tuple[int, str], Set[int]] = {}
self.dedicated: Dict[int, str] = {}
self.operating = True
@property
def f(self) -> int:
return self.cfg.f
def _q_prepare(self) -> int:
return 2 * self.f + 1
def _q_commit(self) -> int:
return 2 * self.f + 1
@staticmethod
def digest_of(payload: str) -> str:
return hashlib.sha256(payload.encode("utf-8")).hexdigest()We outline the configuration and inner state of every PBFT node collaborating within the protocol. We initialize information constructions for monitoring pre-prepare, put together, and commit votes whereas supporting each trustworthy and Byzantine conduct. We additionally implement quorum threshold logic and deterministic digest technology for request validation.
async def suggest(self, payload: str, seq: int):
if self.nid != self.cfg.primary_id:
elevate ValueError("Only the primary can propose in this simplified simulator.")
if not self.byzantine:
dig = self.digest_of(payload)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.internet.broadcast(self.nid, msg)
return
for dst in self.internet.nodes.keys():
variant = f"{payload}::to={dst}::salt={random.randint(0,10**9)}"
dig = self.digest_of(variant)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.internet.ship(dst, msg)
async def handle_preprepare(self, msg: Msg):
seq = msg.seq
dig = msg.digest
if self.byzantine:
if random.random() < 0.5:
return
fake_dig = dig if random.random() < 0.5 else self.digest_of(dig + "::fake")
out = Msg(PREPARE, msg.view, seq, fake_dig, self.nid)
await self.internet.broadcast(self.nid, out)
return
if seq not in self.preprepare_seen:
self.preprepare_seen[seq] = dig
out = Msg(PREPARE, msg.view, seq, dig, self.nid)
await self.internet.broadcast(self.nid, out)
async def handle_prepare(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.prepare_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if len(voters) >= self._q_prepare():
out = Msg(COMMIT, msg.view, seq, dig, self.nid)
await self.internet.broadcast(self.nid, out)
async def handle_commit(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.commit_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if seq in self.dedicated:
return
if len(voters) >= self._q_commit():
self.dedicated[seq] = digWe implement the core PBFT protocol logic, together with proposal dealing with and the pre-prepare and put together phases. We explicitly mannequin Byzantine equivocation by permitting malicious nodes to ship conflicting digests to totally different friends. We advance the protocol to the commit section as soon as the required put together quorum is reached.
async def run(self):
whereas self.operating:
msg = await self.inbox.get()
if msg.typ == PREPREPARE:
await self.handle_preprepare(msg)
elif msg.typ == PREPARE:
await self.handle_prepare(msg)
elif msg.typ == COMMIT:
await self.handle_commit(msg)
def cease(self):
self.operating = False
def pbft_params(n: int) -> int:
return (n - 1) // 3
async def run_single_consensus(
n: int,
malicious: int,
net_cfg: NetConfig,
payload: str = "tx: pay Alice->Bob 5",
seq: int = 1,
timeout_s: float = 2.0,
seed: Non-compulsory[int] = None
) -> Dict[str, object]:
if seed shouldn't be None:
random.seed(seed)
f_max = pbft_params(n)
f = f_max
internet = Community(net_cfg)
cfg = NodeConfig(n=n, f=f, primary_id=0, view=0, timeout_s=timeout_s)
mal_set = set(random.pattern(vary(n), ok=min(malicious, n)))
nodes: Record[Node] = []
for i in vary(n):
node = Node(i, internet, cfg, byzantine=(i in mal_set))
internet.register(node)
nodes.append(node)
duties = [asyncio.create_task(node.run()) for node in nodes]
t0 = time.perf_counter()
await nodes[cfg.primary_id].suggest(payload, seq)
trustworthy = [node for node in nodes if not node.byzantine]
goal = max(1, len(trustworthy))
committed_honest = 0
latency = None
async def poll_commits():
nonlocal committed_honest, latency
whereas True:
committed_honest = sum(1 for node in trustworthy if seq in node.dedicated)
if committed_honest >= goal:
latency = time.perf_counter() - t0
return
await asyncio.sleep(0.005)
strive:
await asyncio.wait_for(poll_commits(), timeout=timeout_s)
success = True
besides asyncio.TimeoutError:
success = False
latency = None
for node in nodes:
node.cease()
for activity in duties:
activity.cancel()
await asyncio.collect(*duties, return_exceptions=True)
digest_set = set(node.dedicated.get(seq) for node in trustworthy if seq in node.dedicated)
agreed = (len(digest_set) == 1) if success else False
return {
"n": n,
"f": f,
"malicious": malicious,
"mal_set": mal_set,
"success": success,
"latency_s": latency,
"honest_committed": committed_honest,
"honest_total": len(trustworthy),
"agreed_digest": agreed,
}We full the PBFT state machine by processing commit messages and finalizing choices as soon as commit quorums are glad. We run the node occasion loop to repeatedly course of incoming messages asynchronously. We additionally embrace lifecycle controls to soundly cease nodes after every experiment run.
async def latency_sweep(
n: int = 10,
max_malicious: Non-compulsory[int] = None,
trials_per_point: int = 5,
timeout_s: float = 2.0,
net_cfg: Non-compulsory[NetConfig] = None,
seed: int = 7
):
if net_cfg is None:
net_cfg = NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05)
if max_malicious is None:
max_malicious = n
outcomes = []
random.seed(seed)
for m in vary(0, max_malicious + 1):
latencies = []
successes = 0
agreements = 0
for t in vary(trials_per_point):
out = await run_single_consensus(
n=n,
malicious=m,
net_cfg=net_cfg,
timeout_s=timeout_s,
seed=seed + 1000*m + t
)
outcomes.append(out)
if out["success"]:
successes += 1
latencies.append(out["latency_s"])
if out["agreed_digest"]:
agreements += 1
avg_lat = sum(latencies)/len(latencies) if latencies else None
print(
f"malicious={m:2d} | success={successes}/{trials_per_point} "
f"| avg_latency={avg_lat if avg_lat is not None else 'NA'} "
f"| digest_agreement={agreements}/{successes if successes else 1}"
)
return outcomes
def plot_latency(outcomes: Record[Dict[str, object]], trials_per_point: int):
by_m = {}
for r in outcomes:
m = r["malicious"]
by_m.setdefault(m, []).append(r)
xs, ys = [], []
success_rate = []
for m in sorted(by_m.keys()):
group = by_m[m]
lats = [g["latency_s"] for g in group if g["latency_s"] shouldn't be None]
succ = sum(1 for g in group if g["success"])
xs.append(m)
ys.append(sum(lats)/len(lats) if lats else float("nan"))
success_rate.append(succ / len(group))
plt.determine()
plt.plot(xs, ys, marker="o")
plt.xlabel("Number of malicious (Byzantine) nodes")
plt.ylabel("Consensus latency (seconds) — avg over successes")
plt.title("PBFT Simulator: Latency vs Malicious Nodes")
plt.grid(True)
plt.present()
plt.determine()
plt.plot(xs, success_rate, marker="o")
plt.xlabel("Number of malicious (Byzantine) nodes")
plt.ylabel("Success rate")
plt.title("PBFT Simulator: Success Rate vs Malicious Nodes")
plt.ylim(-0.05, 1.05)
plt.grid(True)
plt.present()
async def primary():
n = 10
trials = 6
f = pbft_params(n)
print(f"n={n} => PBFT theoretical max f = floor((n-1)/3) = {f}")
print("Theory: safety/liveness typically assumed when malicious <= f and timing assumptions hold.n")
outcomes = await latency_sweep(
n=n,
max_malicious=min(n, f + 6),
trials_per_point=trials,
timeout_s=2.0,
net_cfg=NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05),
seed=11
)
plot_latency(outcomes, trials)
await primary()We orchestrate large-scale experiments by sweeping throughout totally different numbers of malicious nodes and amassing latency statistics. We mixture outcomes to research consensus success charges and visualize system conduct utilizing plots. We run the complete experiment pipeline and observe how PBFT degrades because the variety of Byzantine faults approaches and exceeds theoretical limits.
In conclusion, we gained hands-on perception into how PBFT behaves past textbook ensures and the way adversarial stress impacts each latency and liveness in observe. We noticed how quorum thresholds implement security, why consensus breaks down as soon as Byzantine nodes exceed the tolerated sure, and the way asynchronous networks amplify these results. This implementation gives a sensible basis for experimenting with extra superior distributed-systems ideas, resembling view adjustments, chief rotation, or authenticated messaging. It helps us construct instinct for the design trade-offs that underpin fashionable blockchain and distributed belief programs.
Take a look at the Full Codes right here. Additionally, be happy to observe us on Twitter and don’t neglect to hitch our 120k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be part of us on telegram as nicely.



