On this tutorial, we design an end-to-end, production-style analytics and modeling pipeline utilizing Vaex to function effectively on thousands and thousands of rows with out materializing knowledge in reminiscence. We generate a practical, large-scale dataset, engineer wealthy behavioral and city-level options utilizing lazy expressions and approximate statistics, and mixture insights at scale. We then combine Vaex with scikit-learn to coach and consider a predictive mannequin, demonstrating how Vaex can act because the spine for high-performance exploratory evaluation and machine-learning workflows.
!pip -q set up "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").model.break up()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
n = 2_000_000
cities = np.array(["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
metropolis = rng.selection(cities, dimension=n, exchange=True, p=np.array([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, dimension=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, dimension=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, dimension=n).astype("int32")
base_income = rng.lognormal(imply=10.6, sigma=0.45, dimension=n).astype("float64")
city_mult = pd.Sequence({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec City":0.88,"Winnipeg":0.90}).reindex(metropolis).to_numpy()
revenue = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimal(tenure_m,120))).astype("float64")
revenue = np.clip(revenue, 18_000, 420_000)
noise = rng.regular(0, 1, dimension=n).astype("float64")
score_latent = (
0.55*np.log1p(revenue/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.most(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(metropolis == "Vancouver").astype("float64")
+ 0.15*(metropolis == "Toronto").astype("float64")
+ 0.10*(metropolis == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
goal = (rng.random(n) < p).astype("int8")
df = vaex.from_arrays(metropolis=metropolis, age=age, tenure_m=tenure_m, tx=tx, revenue=revenue, goal=goal)
df["income_k"] = df.revenue / 1000.0
df["tenure_y"] = df.tenure_m / 12.0
df["log_income"] = df.revenue.log1p()
df["tx_per_year"] = df.tx / (df.tenure_y + 0.25)
df["value_score"] = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df["is_new"] = df.tenure_m < 6
df["is_senior"] = df.age >= 60
print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))We generate a big, life like artificial dataset and initialize a Vaex DataFrame to work lazily on thousands and thousands of rows. We engineer core numerical options immediately as expressions so no intermediate knowledge is materialized. We validate the setup by inspecting schema, row counts, and a small pattern of computed values.
encoder = vaex.ml.LabelEncoder(options=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {v:okay for okay,v in city_map.gadgets()}
n_cities = len(city_map)
p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.imply("income_k", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.imply("target", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.rely(binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"city": [inv_city_map[i] for i in vary(n_cities)],
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)
print("nCity summary:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.be part of(df_city_features, on="city", rsuffix="_city")
df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_scoreWe encode categorical metropolis knowledge and compute scalable, approximate per-city statistics utilizing binning-based operations. We assemble these aggregates right into a city-level desk and be part of them again to the primary dataset. We then derive relative options that evaluate every file in opposition to its metropolis context.
features_num = [
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
]
scaler = vaex.ml.StandardScaler(options=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
options = ["z_"+f for f in features_num] + ["label_encoded_city"]
df_train, df_test = df.split_random([0.80, 0.20], random_state=42)
mannequin = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(mannequin=mannequin, options=options, goal="target", prediction_name="pred")
t0 = time.time()
vaex_model.match(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.rework(df_test)
y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("nModel:")
print("fit_seconds:", spherical(fit_s, 3))
print("test_auc:", spherical(float(auc), 4))
print("test_avg_precision:", spherical(float(ap), 4))We standardize all numeric options utilizing Vaex’s ML utilities and put together a constant function vector for modeling. We break up the dataset with out loading your complete dataset into reminiscence. We practice a logistic regression mannequin via Vaex’s sklearn wrapper and consider its predictive efficiency.
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], proper=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
raise = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.rely(), "rate": vaex.agg.imply("y_true"), "avg_pred": vaex.agg.imply("y_pred")}).type("bucket")
lift_pd = raise.to_pandas_df()
baseline = float(df_test_local["y_true"].imply())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile lift table:")
print(lift_pd.to_string(index=False))We analyze mannequin habits by segmenting predictions into deciles and computing raise metrics. We calculate baseline charges and evaluate them throughout rating buckets to evaluate rating high quality. We summarize the leads to a compact raise desk that displays real-world mannequin diagnostics.
out_dir = "/content/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.be part of(out_dir, "customers_vaex.parquet")
state_path = os.path.be part of(out_dir, "vaex_pipeline.json")
base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export = df[export_cols].pattern(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.take away(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {okay: {str(kk): int(vv) for kk,vv in v.gadgets()} for okay,v in encoder.labels_.gadgets()},
"scaler_mean": [float(x) for x in scaler.mean_],
"scaler_std": [float(x) for x in scaler.std_],
"features_num": features_num,
"export_cols": export_cols,
}
with open(state_path, "w") as f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen["income_k"] = df_reopen.revenue / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.revenue.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.be part of(df_city_features, on="city", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score
with open(state_path, "r") as f:
st = json.load(f)
labels_city = {okay: int(v) for okay,v in st["encoder_labels"]["city"].gadgets()}
df_reopen["label_encoded_city"] = df_reopen.metropolis.map(labels_city, default_value=-1)
for i, feat in enumerate(st["features_num"]):
mean_i = st["scaler_mean"][i]
std_i = st["scaler_std"][i] if st["scaler_std"][i] != 0 else 1.0
df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i
df_reopen = vaex_model.rework(df_reopen)
print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))
print("nDone.")We export a big, feature-complete pattern to Parquet and persist the complete preprocessing state for reproducibility. We reload the information and deterministically rebuild all engineered options from saved metadata. We run inference on the reloaded dataset to substantiate that the pipeline stays steady and deployable end-to-end.
In conclusion, we demonstrated how Vaex allows quick, memory-efficient knowledge processing whereas nonetheless supporting superior function engineering, aggregation, and mannequin integration. We confirmed that approximate statistics, lazy computation, and out-of-core execution permit us to scale cleanly from evaluation to deployment-ready artifacts. By exporting reproducible options and persisting the complete pipeline state, we closed the loop from uncooked knowledge to inference, illustrating how Vaex matches naturally into trendy large-data Python workflows.
Try the Full Codes right here. Additionally, be happy to observe us on Twitter and don’t overlook to hitch our 120k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you may be part of us on telegram as nicely.



