# Introduction
The typical data scientist devotes approximately 45% of their work hours to data cleaning and preparation — not to modeling, not to generating insights, and not to tasks that demand real judgment. This figure keeps surfacing in industry surveys because it remains stubbornly accurate. The activities consuming that time — profiling columns, identifying null values, running the same exploratory data analysis (EDA) scripts, searching through hyperparameter grids, and writing identical monitoring checks — are predictable enough to follow well-defined rules.
That predictability is exactly what makes them suitable for agent-driven automation. Agentic workflows don’t eliminate the role of the data scientist. Instead, they take on the procedural burden so you can concentrate on the evaluative side: judging whether a model is reasonable, determining whether a feature carries real signal, and deciding whether a finding justifies a business action. Platforms such as Databricks have already begun integrating agentic data science features into their core systems, with their Agent framework purpose-built to “shrink the gap between asking a question and getting an answer.” This is where production data teams are headed.
This article walks through five practical agentic workflows, each aligned with a key phase of a data science pipeline. Every workflow includes a realistic scenario, proven code examples, and the architectural choices that matter when running in production.
# Prerequisites
All five workflows require Python 3.10 or later and working knowledge of pandas, scikit-learn, and basic large language model (LLM) API interactions. Additional package dependencies are noted within each workflow section. For the tool-calling patterns, you’ll need either an OpenAI API key or a local serving endpoint (Ollama, vLLM) that provides an OpenAI-compatible API.
# Core packages used across all workflows
pip install openai pandas numpy scipy scikit-learn lightgbm shap pydantic# Workflow 1: Automated Exploratory Data Analysis Agent
What it replaces: The manual routine of loading data, computing summary statistics, visualizing distributions, checking for missing values, spotting outliers, and documenting findings. Every new dataset triggers the same script, just with different column names.
What the agent does instead: It ingests the dataset, executes a comprehensive profile, categorizes detected problems by severity, and generates a structured Markdown report. A human then reviews the output and decides on next steps. The agent takes care of everything leading up to that human checkpoint.
// Architecture
The agent operates on a Reasoning and Acting (ReAct) loop equipped with two tools: profile_dataset computes per-column summary statistics, and flag_issues ranks problems by severity. The agent then merges both outputs into a unified report through a single language model call. The critical architectural choice lies in how the agent processes the flag_issues results — it evaluates which issues are genuinely actionable before reporting, so the final output is a prioritized list rather than an unfiltered data dump.
// Code Pattern
# eda_agent.py
# Prerequisites: pip install openai pandas scipy
# Run: python eda_agent.py
import json
import pandas as pd
from scipy import stats
from openai import OpenAI
from dataclasses import dataclass
client = OpenAI() # Uses OPENAI_API_KEY env var
@dataclass
class ColumnIssue:
column: str
issue_type: str # null_rate | skewness | dtype | high_correlation
severity: str # low | medium | high
detail: str
def profile_dataset(df: pd.DataFrame) -> dict:
"""
Generate per-column statistics.
In production, swap this for ydata-profiling for richer output.
"""
profile = {}
for col in df.columns:
col_stats = {
"dtype": str(df[col].dtype),
"null_rate": df[col].isnull().mean(),
"n_unique": df[col].nunique(),
}
if pd.api.types.is_numeric_dtype(df[col]):
col_stats["skewness"] = float(df[col].skew())
col_stats["mean"] = float(df[col].mean())
col_stats["std"] = float(df[col].std())
elif df[col].dtype == "object":
non_null = df[col].dropna()
numeric_coerced = pd.to_numeric(non_null, errors="coerce")
col_stats["looks_numeric"] = bool(len(non_null) > 0 and numeric_coerced.notna().mean() > 0.9)
profile[col] = col_stats
return profile
def flag_issues(profile: dict) -> list[ColumnIssue]:
"""
Flag data quality issues from a column profile.
Severity tiers: high = needs immediate attention, medium = worth reviewing.
"""
issues = []
for col, stats_dict in profile.items():
null_rate = stats_dict.get("null_rate", 0.0)
if null_rate > 0.15:
issues.append(ColumnIssue(col, "null_rate", "high",
f"{null_rate:.0%} of values are missing"))
elif null_rate > 0.05:
issues.append(ColumnIssue(col, "null_rate", "medium",
f"{null_rate:.0%} of values are missing"))
skewness = abs(stats_dict.get("skewness", 0.0))
if skewness > 5.0:
issues.append(ColumnIssue(col, "skewness", "high",
f"Extreme skew={skewness:.1f} -- consider log transform"))
elif skewness > 2.0:
issues.append(ColumnIssue(col, "skewness", "medium",
f"Moderate skew={skewness:.1f}"))
# Object columns with all-numeric values are likely miscoded
if stats_dict["dtype"] == "object" and stats_dict.get("looks_numeric", False):
issues.append(ColumnIssue(col, "dtype", "medium",
"Numeric values stored as strings"))
return issues
def run_eda_agent(df: pd.DataFrame, dataset_description: str) -> str:
"""
Run the EDA agent loop.
The agent decides which tools to call and in what sequence,
then produces a structured report summarizing its findings.
"""
profile = profile_dataset(df)
issues = flag_issues(profile)
# Format issues for the agent
issues_text = "n".join(
f"- [{i.severity.upper()}] {i.column}: {i.issue_type} -- {i.detail}"
for i in issues
) or "No issues detected."
prompt = f"""You are a senior data scientist reviewing a dataset for a data science project.
Dataset: {dataset_description}
Column profile (summary stats):
{json.dumps(profile, indent=2)}
Detected issues:
{issues_text}
Write a structured EDA report with these sections:
1. DATASET OVERVIEW -- shape, dtypes,Below is a Python script that uses GPT-4o-mini to performs an automated Exploratory Data Analysis (EDA) on any pandas DataFrame. It checks data quality issues, surfaces key statistical findings, and produces a structured report in four sections:
Data quality summary, including missing-value rates, cardinality, and type warnings
High priority issues, ranked by severity
Formatting or transformation notes
Suggested actions for cleaning the dataset
This shows how to write an EDA agent that can be plugged into your existing workflow as a callable library function.
// Code pattern
# Prerequisites: pip install openai pandas
# Run: OPENAI_API_KEY=your_key python agent_eda.py
import pandas as pd
from openai import OpenAI
client = OpenAI()
def run_eda_agent(df: pd.DataFrame, context: str) -> str:
"""Generate a structured EDA summary for a given DataFrame."""
# Build compact column-level summaries for the prompt
col_summaries = {}
for col in df.columns:
series = df[col]
info = {
"dtype": str(series.dtype),
"null_pct": round(series.isnull().mean() * 100, 1),
"nunique": int(series.nunique()),
}
if pd.api.types.is_numeric_dtype(series):
info.update({
"mean": round(series.mean(), 2),
"std": round(series.std(), 2),
"skew": round(series.skew(), 2),
"min": round(series.min(), 2),
"max": round(series.max(), 2),
"is_unique": bool(series.is_unique),
})
elif pd.api.types.is_object_dtype(series):
sample_values = series.dropna().head(10).tolist()
info["sample_values"] = [str(v) for v in sample_values]
info["max_length"] = int(series.dropna().str.len().max()) if series.dropna().shape[0] > 0 else 0
col_summaries[col] = info
prompt = f"""You are a senior data scientist examining a new dataset.
Context: {context}
Dataset shape: {df.shape}
Column summaries:
{col_summaries}
Provide a concise assessment with exactly these sections:
OVERALL QUALITY ASSESSMENT:
<1-2 sentences summarizing the overall health of this dataset>
HIGH PRIORITY ISSUES:
<List any data quality issues that need attention. Format as a bulleted list.>
MEDIUM PRIORITY ISSUES:
<List less urgent observations worth keeping an eye on. Format as a bulleted list.>
RECOMMENDED NEXT STEPS:
<Provide a numbered list of 4 specific actions to prepare this data for analysis>
Keep the response brief. Focus on practical guidance rather than completeness."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
)
return response.choices[0].message.content
# ── Demo run ───────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Generate sample retail transaction data
import numpy as np
np.random.seed(0)
row_count = 4000
transactions = pd.DataFrame({
"revenue": np.random.lognormal(mean=5, sigma=1.5, size=row_count),
"customer_age": np.random.uniform(18, 75, row_count).astype(int),
"created_at": pd.date_range("2023-06-01", periods=row_count, freq="min").astype(str),
"region_code": np.random.choice(["US", "EU", "APAC", None], size=row_count, p=[0.45, 0.3, 0.15, 0.1]),
"session_count": np.where(np.random.rand(row_count) < 0.25, None, np.random.randint(1, 60, row_count)),
})
quality_report = run_eda_agent(transactions, "E-commerce transaction data with user session tracking")
print(quality_report)
Below is a Python script that uses an LLM-powered agent to perform automated feature selection for a classification task. The agent generates feature candidates, evaluates their importance, prunes low-importance ones, and produces a human-readable summary of the results.
# feature_agent.py
# Prerequisites: pip install openai scikit-learn pandas numpy
# Run: python feature_agent.py
import json
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
from sklearn.model_selection import train_test_split
client = OpenAI()
# ── Data structures ───────────────────────────────────────────────────────────
@dataclass
class FeatureSelection:
kept: dict
pruned: dict
scores: dict
# ── Tool: Evaluate and prune features ─────────────────────────────────────────
def evaluate_and_prune(
df: pd.DataFrame,
candidates: dict,
target_col: str = "churned",
threshold: float = 0.05,
) -> FeatureSelection:
"""
Train a RandomForest on the candidate features, compute permutation
importance, and drop any feature whose importance falls below `threshold`.
"""
feature_names = list(candidates.keys())
X = df[feature_names].copy()
y = df[target_col].copy()
# Handle any categoricals that slipped through
for col in X.select_dtypes(include="object").columns:
X[col] = X[col].astype("category").cat.codes
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.25, random_state=42, stratify=y
)
clf = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
clf.fit(X_train, y_train)
imp = permutation_importance(
clf, X_val, y_val, n_repeats=10, random_state=42, n_jobs=-1
)
scores = dict(zip(feature_names, imp.importances_mean))
kept = {f: round(s, 4) for f, s in scores.items() if s >= threshold}
pruned = {f: round(s, 4) for f, s in scores.items() if s < threshold}
return FeatureSelection(kept=kept, pruned=pruned, scores=scores)
# ── Tool: Generate feature candidates via LLM ─────────────────────────────────
def generate_feature_candidates(
column_descriptions: dict,
target: str = "churned",
task_type: str = "classification",
n_candidates: int = 15,
) -> dict:
"""
Ask the LLM to propose engineered feature definitions given a set of
base column descriptions. Returns a dict of {name: formula_string}.
"""
desc_text = "n".join(f" - {col}: {d}" for col, d in column_descriptions.items())
prompt = f"""You are a feature-engineering assistant for a {task_type} task.
Target variable: {target}
Available columns:
{desc_text}
Propose {n_candidates} engineered features as a JSON dict:
feature_name: a short Python expression using the column names
Return ONLY the JSON object, no commentary."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.4,
)
return json.loads(response.choices[0].message.content)
# ── Tool: Explain the selection outcome ───────────────────────────────────────
def explain_selection(
kept: dict,
pruned: dict,
scores: dict,
) -> str:
"""Ask the LLM to summarise which features survived and why."""
prompt = f"""You are a data-science reviewer. A feature selection run produced the following results.
Features KEPT (above threshold):
{json.dumps({f: round(scores.get(f, 0), 4) for f in kept}, indent=2)}
Features PRUNED (below threshold):
{json.dumps({f: round(scores.get(f, 0), 4) for f in pruned}, indent=2)}
Write a 3-5 sentence summary of the selection outcome.
Note any surprising prunings or unexpectedly high-importance features.
Suggest one additional feature worth testing based on what survived."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
)
return response.choices[0].message.content
if __name__ == "__main__":
column_descriptions = {
"days_since_login": "Number of days since the customer last logged in",
"plan_tier": "Subscription tier: basic, pro, or enterprise",
"support_tickets_90d": "Number of support tickets opened in the last 90 days",
"monthly_spend": "Customer's average monthly spend in USD",
}
candidates = generate_feature_candidates(
column_descriptions, target="churned", task_type="classification", n_candidates=10
)
# In production, load real customer data here
np.random.seed(42)
n = 3000
df = pd.DataFrame({
"days_since_login": np.random.randint(0, 90, n),
"plan_tier": np.random.choice(["basic", "pro", "enterprise"], n),
"support_tickets_90d": np.random.poisson(1.5, n),
"monthly_spend": np.random.exponential(80, n),
"churned": np.random.binomial(1, 0.15, n),
})
kept, pruned, scores = evaluate_and_prune(df, candidates, target_col="churned")
summary = explain_selection(kept, pruned, scores)
print(summary)
How to run:
Real scenario
Customer churn prediction, 12 input columns including days_since_login, plan_tier, support_tickets_90d, and monthly_spend. The agent proposes 15 candidates, including spend_per_day, tickets_per_spend_ratio, and login_recency_x_plan. After evaluation, 9 survive the importance threshold. The explanation calls out that tickets_per_spend_ratio has the highest importance score (0.18): "customers spending more who are also raising support tickets are a particularly high churn risk," which becomes a finding worth sharing with the product team.
# Workflow 3: Agentic Hyperparameter Optimization
What it replaces: Grid search (exhaustive but wasteful), random search (efficient but undirected), and manual Bayesian optimization setup (powerful yet laden with boilerplate). All of these approaches frame hyperparameter tuning as a search problem. An agent, by contrast, treats it as a reasoning problem.
What the agent does instead: It proposes a hyperparameter configuration, evaluates it by training the model, examines the metric trend across iterations, pinpoints which parameters are driving improvement, and adjusts the search direction accordingly — all without explicit instruction. It arrives at a strong configuration in far fewer iterations than grid or random search.
// Architecture
One agent, one tool: train_and_evaluate. The tool accepts a Pydantic-validated hyperparameter config, trains the model with 5-fold cross-validation, and returns the AUC, training time, and the train/validation overfitting gap. At each step the agent receives the full trial history and reasons about what to try next. Convergence is signalled when the last three AUC scores differ by less than 0.005.
This design draws directly from published research on agentic hyperparameter tuning, which demonstrated that LLM-guided search can outperform Bayesian optimization on mid-sized classification tasks by 5–12% in fewer iterations.
// Code Pattern
# hp_agent.py
# Prerequisites: pip install openai scikit-learn pydantic pandas numpy
# Run: python hp_agent.py
import json
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, field_validator
from openai import OpenAI
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score
from sklearn.datasets import make_classification
import numpy as np
client = OpenAI()
# ── Pydantic schema for structured tool input ─────────────────────────────────
# The model must return valid hyperparameters — Pydantic catches invalid values
# before the training job starts, saving wasted compute on bad configs.
class HyperparamConfig(BaseModel):
n_estimators: int = Field(..., ge=10, le=1000, description="Number of trees")
max_depth: int = Field(..., ge=1, le=50, description="Max tree depth")
min_samples_split: int = Field(..., ge=2, le=50, description="Min samples to split")
max_features: float = Field(..., gt=0, le=1.0, description="Fraction of features per split")
@dataclass
class TrialResult:
iteration: int
config: dict
val_auc: float
train_auc: float
train_time_s: float
@property
def overfit_gap(self) -> float:
return round(self.train_auc - self.val_auc, 4)
def train_and_evaluate(config: dict, X, y) -> TrialResult:
"""
Train a RandomForest with the given config and return cross-validated metrics.
This is the tool the agent calls on each iteration.
"""
import time
params = HyperparamConfig(**config) # Validates before training
clf = RandomForestClassifier(
n_estimators=params.n_estimators,
max_depth=params.max_depth,
min_samples_split=params.min_samples_split,
max_features=params.max_features,
random_state=42,
n_jobs=-1,
)
t0 = time.time()
val_scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
clf.fit(X, y)
train_auc = roc_auc_score(y, clf.predict_proba(X)[:, 1])
return TrialResult(
iteration=0,
config=config,
val_auc=round(float(val_scores.mean()), 4),
train_auc=round(float(train_auc), 4),
train_time_s=round(time.time() - t0, 2),
)
def detect_convergence(results: list[TrialResult], window: int = 3, tol: float = 0.005) -> bool:
"""Stop when the last `window` AUC scores span less than `tol`."""
if len(results) < window:
return False
recent = [r.val_auc for r in results[-window:]]
return (max(recent) - min(recent)) < tol
def propose_next_config(trial_history: list[TrialResult]) -> dict:
"""
Ask the agent to propose the next hyperparameter configuration,
reasoning from the full trial history.
"""
history_text = "n".join(
f"Trial {r.iteration}: config={r.config}, val_AUC={r.val_auc}, "
f"overfit_gap={r.overfit_gap}, time={r.train_time_s}s"
for r in trial_history
)
prompt = f"""You are optimizing a RandomForest classifier. Your goal is to maximize val_AUC.
Trial history:
{history_text}
Parameter
Below is the paraphrased article in HTML format:
# Workflow 4: Automated Model Monitoring and Drift Detection Agent
What it replaces: The manual process of periodically inspecting feature distributions, crafting per-column threshold rules, maintaining dashboard alerts that become outdated, and only discovering model performance drops after they have already impacted business metrics.
What the agent does instead: Operates on a recurring schedule against incoming batch data, calculates drift statistics for each feature using the Population Stability Index (PSI) and the Kolmogorov-Smirnov (KS) test, assigns a severity level to the drift, and then takes different actions based on that severity: mild drift raises an alert, while severe drift initiates a retraining pipeline call.
// Architecture
A scheduled agent centered around a single tool, compute_drift_stats, which calculates PSI and the KS test for every column and then classifies the outcome by severity. One language model call then determines the appropriate response: a passing check is simply logged, mild drift generates a drafted notification for the data science team, and severe drift produces both an alert and a trigger for a retraining directed acyclic graph (DAG), delivered through Slack or the Airflow representational state transfer (REST) API. The pivotal design choice is this branching response logic; the agent manages the routing rather than relying on a rigid if/else chain.
PSI interpretation: values under 0.1 indicate stability, values between 0.1 and 0.25 signal mild drift that warrants monitoring, and values above 0.25 represent significant drift that should prompt retraining. PSI is the conventional metric for detecting population shifts in production machine learning systems and has been a staple of financial risk modeling for decades, well before LLMs emerged.
// Code Pattern
# drift_agent.py
# Prerequisites: pip install openai pandas scipy numpy
# Run: python drift_agent.py
import json
import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from openai import OpenAI
client = OpenAI()
@dataclass
class FeatureDrift:
feature: str
psi: float
ks_stat: float
ks_pvalue: float
severity: str # stable | mild_drift | severe_drift
def compute_psi(baseline: np.ndarray, current: np.ndarray, buckets: int = 10) -> float:
"""
Population Stability Index between baseline and current distributions.
PSI = sum((current_% - baseline_%) * ln(current_% / baseline_%))
Values: <0.1 stable | 0.1-0.25 mild | >0.25 severe
"""
min_val = min(baseline.min(), current.min())
max_val = max(baseline.max(), current.max())
bucket_width = (max_val - min_val) / buckets
def bucket_freqs(data: np.ndarray) -> list[float]:
counts = np.zeros(buckets)
for v in data:
idx = min(int((v - min_val) / bucket_width), buckets - 1)
counts[idx] += 1
freqs = counts / len(data)
return [max(f, 1e-6) for f in freqs] # Avoid log(0)
b_freq = bucket_freqs(baseline)
c_freq = bucket_freqs(current)
return round(sum((c - b) * math.log(c / b) for b, c in zip(b_freq, c_freq)), 4)
def classify_drift(psi: float) -> str:
if psi < 0.10: return "stable"
if psi < 0.25: return "mild_drift"
return "severe_drift"
def compute_drift_stats(
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
numeric_cols: list[str],
) -> list[FeatureDrift]:
"""Compute PSI and KS test for each numeric feature."""
from scipy.stats import ks_2samp
results = []
for col in numeric_cols:
b = baseline_df[col].dropna().values
c = current_df[col].dropna().values
psi = compute_psi(b, c)
ks_stat, ks_pvalue = ks_2samp(b, c)
results.append(FeatureDrift(
feature=col,
psi=psi,
ks_stat=round(float(ks_stat), 4),
ks_pvalue=round(float(ks_pvalue), 6),
severity=classify_drift(psi),
))
return results
def run_monitoring_agent(
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
numeric_cols: list[str],
model_name: str = "churn_model_v3",
)
# Workflow 5: Automated Pipeline Orchestration and Self-Repair
What it replaces: Reacting to an Airflow failure alert, digging into log files, manually scanning the stack trace, diagnosing whether the issue calls for a code modification, a configuration tweak, or simply a retry, hand-crafting the fix, retrying the task, and then worrying about whether downstream tasks might fail for the same reason.
What the agent does instead: Interprets the failure log, categorizes the error, evaluates if it can be resolved automatically, applies the resolution when possible, and otherwise escalates to a human engineer with a comprehensive, well-organized incident report.
// Architecture
A meta-agent that wraps around your existing orchestration layer. When an Airflow task fails, the orchestrator forwards the task identifier, error log, and task definition to the agent. The agent leverages a single tool, parse_pipeline_error, to categorize the failure through deterministic rules. After classification, a single large language model invocation determines whether the error can be corrected automatically and generates either a fix description or a structured incident report for manual investigation, depending on the categorization outcome.
// Code Pattern
# pipeline_healer.py
# Prerequisites: pip install openai pandas
# Run: python pipeline_healer.py
import json
import re
from dataclasses import dataclass
from typing import Optional
from openai import OpenAI
client = OpenAI()
@dataclass
class PipelineError:
task_id: str
error_type: str # schema_mismatch | null_violation | timeout | unknown
column: Optional[str]
detail: str
auto_fixable: bool
def parse_pipeline_error(log_line: str, task_id: str) -> PipelineError:
"""
Categorize a task failure log entry into a structured error classification.
Auto-fixable errors are those that can be repaired without human involvement.
"""
if "KeyError" in log_line or ("column" in log_line.lower() and "not found" in log_line.lower()):
col_match = re.search(r"['"](w+)['"]", log_line)
col = col_match.group(1) if col_match else None
return PipelineError(task_id, "schema_mismatch", col, log_line.strip(), auto_fixable=True)
if "IntegrityError" in log_line or ("null" in log_line.lower() and "violate" in log_line.lower()):
return PipelineError(task_id, "null_violation", None, log_line.strip(), auto_fixable=True)
if "TimeoutError" in log_line or "timed out" in log_line.lower():
return PipelineError(task_id, "timeout", None, log_line.strip(), auto_fixable=False)
return PipelineError(task_id, "unknown", None, log_line.strip(), auto_fixable=False)
def run_self_healing_agent(
task_id: str,
error_log: str,
task_definition: str,
) -> str:
"""
Run the self-healing agent against a failed pipeline task.
It categorizes the error, selects a remediation strategy, and produces
either an auto-fix description or a structured escalation report.
"""
error = parse_pipeline_error(error_log, task_id)
prompt = f"""You are a data pipeline reliability engineer.
A pipeline task has failed and you must decide how to respond.
Task: {task_id}
Task definition: {task_definition}
Error type: {error.error_type}
Column affected: {error.column or 'N/A'}
Auto-fixable: {error.auto_fixable}
Full error: {error.detail}
{"You are permitted to apply an automatic fix for this error type." if error.auto_fixable else "This error requires human review -- you are not permitted to auto-fix it."}
Respond with:
ACTION:
FIX_DESCRIPTION:
ESCALATION_REPORT:
NEXT_STEP: """
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
)
return response.choices[0].message.content
if __name__ == "__main__":
# Scenario: # CRM Export Added a New Column and Changed a Date Format
```python
result = run_self_healing_agent(
task_id="ingest_crm_daily",
error_log="KeyError: 'transaction_date' column not found in source dataframe. "
"Available columns: ['txn_date_utc', 'customer_id', 'amount_usd', 'product_sku']",
task_definition="Reads daily CRM export, extracts transaction_date and customer_id, "
"joins with product catalog, writes to feature store.",
)
print(result)
```
**How to run:**
```bash
python pipeline_healer.py
```
**Real scenario:** A daily feature pipeline crashes at 2 AM because an upstream CRM system updated its export schema — renaming `transaction_date` to `txn_date_utc` and adding three new columns. The agent reads the error log, pinpoints the schema mismatch on `transaction_date`, and generates an auto-fix: it renames the column in the ingestion step and registers the three new columns in the schema definition as nullable. It logs the fix, re-runs the failed task, and sends the on-call engineer a summary that reads: *"Schema fix applied automatically. Source renamed `transaction_date → txn_date_utc`. Three new nullable columns were added to the schema. Task retriggered at 02:14."* The engineer reviews the change in the morning instead of being woken up.
---
# Wrapping Up
The five workflows aren't standalone tools — they form a connected pipeline:
- The **EDA agent** makes sense of the data.
- The **feature engineering agent** enhances it.
- The **hyperparameter agent** tunes the model built on those features.
- The **monitoring agent** keeps an eye on the model in production.
- The **self-healing agent** safeguards the pipeline, feeding data to all the others.
**Deploy them in this order.** Start with monitoring — it delivers immediate value on any existing pipeline without requiring changes to your modeling code. Add the EDA agent next for any new dataset you bring in. The feature engineering and hyperparameter agents come after you've established a baseline model worth improving.

None of these workflows operates without human oversight on the decisions that matter. The EDA agent flags issues — **you** decide what to do about them. The feature agent proposes candidates — **you** decide the importance threshold. The hyperparameter agent searches — **you** decide the parameter bounds and convergence criteria. The monitoring agent detects drift — **you** decide the severity thresholds that trigger retraining. The self-healing agent applies fixes — **you** review them before they merge into production.
That division is the whole point. Agents handle the procedural heavy lifting. You retain the evaluative heavy lifting. The result is a pipeline that's faster, more consistent, and easier to maintain — because the parts that break are now detected and often repaired before you ever have to look at them.
---
**[Shittu Olumide](https://www.linkedin.com/in/olumide-shittu/)** is a software engineer and technical writer passionate about leveraging cutting-edge technologies to craft compelling narratives, with a keen eye for detail and a knack for simplifying complex concepts. You can also find Shittu on [Twitter](https://twitter.com/Shittu_Olumide_).



