Guide to coding and validating end-to-end partitioned data pipelines in Dargst with machine learning integration
In this tutorial, we use Dagst. We set up an iOmanager based on custom CSV to continue assets through cleaning, functional engineering and model training, define daily data generation for partitions, and process synthetic sales data. Along the way, we added a data quality asset check to verify zeros, ranges, and classification values and ensure that metadata and output are stored in a structured way. The focus throughout the process is hands-on implementation, showing how raw data ingestion, transformation, quality checking, and machine learning can be integrated into a single repeatable workflow.
import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])
import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression
BASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START = "2025-08-01"
We first install the required libraries, dagast, pandas and Scikit-learn to provide a complete set of tools in COLAB. We then import the required modules, set up Numpy and Pandas for data processing, and define a base directory and a start date to organize our pipeline output.
class CSVIOManager(IOManager):
def __init__(self, base: Path): self.base = base
def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
context.log.info(f"Saved {context.asset_key} -> {p}")
else:
p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
context.log.info(f"Saved {context.asset_key} -> {p}")
def load_input(self, context):
k = context.upstream_output.asset_key; p = self._path(k, "csv")
df = pd.read_csv(p); context.log.info(f"Loaded {k}
We define a custom CSVIomerager to save asset outputs as CSV or JSON files and reload them if needed. We then register it as DAGSTER as CSV_IO_MANAGER and set up a daily partitioning scheme so that our pipeline can process data for each date independently.
@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 * x + 30 * promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
return Output(df, metadata=meta)
@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = raw_sales.dropna(subset=["units"]).copy()
lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)
meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
return Output(df, metadata=meta)
@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = clean_sales.copy()
df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
for c in ["units", "units_sq", "units_promo"]:
mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
df[f"z_{c}"] = (df[c] - mu) / sigma
return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})
We create three core assets for the pipeline. First, RAW_SALES generates synthetic daily sales data with noise and occasionally missing values, thus simulating the flaws in the real world. Next, clean_sales deletes the empty and clip outliers to stabilize the dataset while recording metadata about range and row counts. Finally, the function prepares data for downstream modeling by adding interactive and standardized variables to perform functional engineering.
@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
nulls = int(clean_sales.isna().sum().sum())
promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
passed = bool((nulls == 0) and promo_ok and units_ok)
return AssetCheckResult(
passed=passed,
metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
)
@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
y = features["sales"].values
model = LinearRegression().fit(X, y)
return {"r2_train": float(model.score(X, y)),
**{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}
We strengthen the pipeline through verification and modeling. CLEAN_SALES_QUALITY ASSET check passes verification that there is no empty, the promotion field has only 0/1 value, and the cleaning unit remains in a valid range, thus forcing data integrity. Afterwards, tiny_model_metrics trains simple linear regression on engineering functions and outputs key metrics such as coefficients for training and learning, providing us with lightweight but complete modeling steps.
defs = Definitions(
assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
resources={"io_manager": csv_io_manager}
)
if __name__ == "__main__":
run_day = os.environ.get("RUN_DATE") or START
print("Materializing everything for:", run_day)
result = materialize(
[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
partition_key=run_day,
resources={"io_manager": csv_io_manager},
)
print("Run success:", result.success)
for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
f = BASE / fname
if f.exists():
print(fname, "->", f.stat().st_size, "bytes")
if fname.endswith(".json"):
print("Metrics:", json.loads(f.read_text()))
We register our asset and IO managers in the definition and then implement the entire DAG for the selected partition key in one run. We continue the CSV/JSON artifact to /content/dagstore and print a quick success flag, along with saved file size and model metrics for immediate verification.
In summary, we implement all assets and checks in a single dagst run, confirm data quality and train a regression model that stores metrics for checking. We keep the pipeline modularly, each asset continues its output in CSV or JSON and ensures compatibility by converting metadata values to support types. This tutorial demonstrates how we can combine partitioning, asset definition and inspection to build a technically powerful and repeatable workflow, giving us a practical framework to scale to more complex real-world pipelines.
Check The complete code is here. Check out ours anytime Tutorials, codes and notebooks for github pages. Also, please stay tuned for us twitter And don’t forget to join us 100K+ ml reddit And subscribe Our newsletter.
Sana Hassan, a consulting intern at Marktechpost and a dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. He is very interested in solving practical problems, and he brings a new perspective to the intersection of AI and real-life solutions.