How to Build a Data Observability Platform With Open-Source Tools (No Vendor Lock-In)
Stop paying six figures for data observability. Learn how to build a production-grade platform using open-source tools for anomaly detection, lineage tracking, and SLA monitoring.
How to Build a Data Observability Platform With Open-Source Tools (No Vendor Lock-In)
The Problem With Data Observability Today
Data teams are expected to monitor data the same way SREs monitor applications. But while application observability has mature, standardized tools (Prometheus, Grafana, OpenTelemetry), data observability remains dominated by expensive vendors charging per-table pricing that scales painfully with your warehouse.
The core capabilities you actually need are not rocket science: metadata collection, statistical anomaly detection, lineage tracking, SLA monitoring, and alerting. Every one of these can be built with open-source tools and standard engineering practices.
I built a working implementation to prove it. The full source code is available at data-observability-platform, and this article walks through the architecture, the tradeoffs, and the lessons learned.
Architecture Overview
The platform follows a collector-processor-dashboard pattern:
[Metadata Collectors] [Processing Layer] [Presentation]
Airflow tasks --> Anomaly Detection --> Streamlit Dashboard
dbt artifacts --> Lineage Builder --> REST API
Database stats --> SLA Monitor --> Alert Channels
| | |
v v v
[PostgreSQL - Central Metadata Store]
Everything flows into a central PostgreSQL database that stores metadata snapshots over time. This time-series approach to metadata is what enables trend analysis and anomaly detection.
Metadata Collectors: The Foundation
Observability starts with collection. The platform implements three collector types, each running on its own schedule.
Airflow Metadata Collector
The Airflow collector pulls DAG run history, task durations, failure rates, and scheduling delays from the Airflow metadata database. It captures:
- Task execution duration (p50, p95, p99)
- DAG success/failure rates over sliding windows
- Scheduling lag (time between scheduled and actual start)
- SLA breach history
This data feeds directly into the anomaly detection engine. When a task that normally takes 3 minutes suddenly takes 45, you want to know before downstream consumers notice bad data.
dbt Artifacts Collector
After every dbt run, the platform parses manifest.json, run_results.json, and catalog.json to extract:
- Model execution times and row counts
- Test pass/fail results with failure details
- Schema changes (new columns, type changes, dropped columns)
- Source freshness check results
The dbt collector is arguably the most valuable because dbt already knows about your data contracts (via tests and schema definitions). The observability platform just needs to track those signals over time.
Database Statistics Collector
The database collector runs periodic profiling queries against your warehouse tables:
- Row counts and growth rates
- NULL percentages per column
- Distinct value counts and cardinality changes
- Min/max/mean for numeric columns
- Value distribution histograms for categorical columns
These statistics are stored as time-series data, enabling the anomaly detection engine to establish baselines and detect drift.
Statistical Anomaly Detection: Simple But Effective
Vendor platforms love to market their "AI-powered anomaly detection." In practice, most data anomalies are caught by straightforward statistical methods. The platform implements two complementary approaches.
Z-Score Detection
For metrics with roughly normal distributions (row counts, execution times, numeric aggregates), z-score detection works well:
def detect_zscore_anomaly(
metric_history: list[float],
current_value: float,
threshold: float = 3.0,
) -> AnomalyResult:
mean = statistics.mean(metric_history)
stdev = statistics.stdev(metric_history)
if stdev == 0:
return AnomalyResult(is_anomaly=False)
z_score = (current_value - mean) / stdev
return AnomalyResult(
is_anomaly=abs(z_score) > threshold,
severity=classify_severity(abs(z_score)),
z_score=z_score,
)
The key design decision is the lookback window. Too short and you get noisy alerts from normal variance. Too long and you miss gradual drift. The platform defaults to 30 days with configurable windows per metric.
Median Absolute Deviation (MAD)
For metrics with outliers or skewed distributions (common in real data), MAD is more robust than z-score:
def detect_mad_anomaly(
metric_history: list[float],
current_value: float,
threshold: float = 3.5,
) -> AnomalyResult:
median = statistics.median(metric_history)
mad = statistics.median(
[abs(x - median) for x in metric_history]
)
if mad == 0:
return AnomalyResult(is_anomaly=False)
modified_z = 0.6745 * (current_value - median) / mad
return AnomalyResult(
is_anomaly=abs(modified_z) > threshold,
severity=classify_severity(abs(modified_z)),
modified_z_score=modified_z,
)
MAD handles the common scenario where a table occasionally receives late-arriving bulk loads that would skew the mean but not the median.
Combining Detectors
The platform runs both detectors and uses a consensus approach: if both flag an anomaly, it is high severity. If only one flags it, it is medium. This reduces false positives significantly compared to using either method alone.
Lineage Graph With Impact Analysis
Data lineage is the feature that separates a monitoring tool from a true observability platform. When something breaks, you need to answer two questions instantly: "What caused this?" and "What else is affected?"
The platform builds a directed acyclic graph (DAG) from three sources:
- dbt refs and sources - Model-to-model and model-to-source relationships
- SQL parsing - For non-dbt transformations, basic SQL parsing extracts table references
- Airflow task dependencies - Connects orchestration-level lineage to data-level lineage
The lineage graph enables two critical operations:
Upstream trace: Given a table with anomalous data, walk upstream to find which source or transformation introduced the problem.
Downstream impact: Given a source table that failed to load, immediately identify every downstream model, dashboard, and consumer affected.
def get_downstream_impact(
graph: nx.DiGraph,
node_id: str,
) -> ImpactReport:
descendants = nx.descendants(graph, node_id)
impacted_models = [
graph.nodes[n] for n in descendants
if graph.nodes[n]["type"] == "model"
]
impacted_dashboards = [
graph.nodes[n] for n in descendants
if graph.nodes[n]["type"] == "dashboard"
]
return ImpactReport(
total_impacted=len(descendants),
models=impacted_models,
dashboards=impacted_dashboards,
critical_path=find_critical_consumers(descendants),
)
The graph is stored in PostgreSQL using an adjacency list and rebuilt incrementally with each collector run. For visualization, the Streamlit dashboard renders the lineage using graphviz with color-coded nodes indicating health status.
SLA Monitoring
SLA monitoring ties everything together by defining contracts for data freshness, completeness, and quality.
Each SLA is defined declaratively in YAML:
slas:
- name: daily_revenue_report
table: gold.fact_revenue
checks:
- type: freshness
max_age_hours: 6
- type: row_count
min_rows: 1000
- type: null_percentage
column: revenue_amount
max_null_pct: 0.01
alert_channels:
- slack:data-alerts
- pagerduty:data-team
schedule: "0 */2 * * *"
The SLA engine evaluates these checks on schedule and tracks breach history. Combined with the lineage graph, it can proactively warn when an upstream failure is likely to cause a downstream SLA breach before it actually happens.
Streamlit Dashboard
The dashboard provides a single-pane view into data health across the entire platform:
- Health overview: Table-level health scores based on recent anomaly count, test failures, and SLA status
- Anomaly timeline: Interactive chart showing detected anomalies over time with drill-down
- Lineage explorer: Visual graph of data flow with anomaly propagation highlighting
- SLA tracker: Current status and historical compliance rates for all defined SLAs
- Profiling snapshots: Column-level statistics over time for any monitored table
Streamlit was chosen deliberately over building a custom React frontend. For an internal observability tool, the speed of development and the data-native component library outweigh the limitations in customization.
What I Would Change in Production
The showcase repo is a working implementation, but production deployments would need adjustments:
Time-series storage: PostgreSQL works for moderate scale, but a dedicated time-series database (TimescaleDB or ClickHouse) would handle the metadata volume at warehouse scale better.
Collector reliability: The current collectors run as standalone scripts. In production, these should be Airflow tasks themselves, giving you retry logic, alerting on collector failures, and scheduling integration for free.
Alert fatigue management: The current alerting is basic threshold-based. Production needs alert grouping, suppression during known maintenance windows, and escalation policies.
Multi-warehouse support: The database collector currently targets PostgreSQL. Real deployments would need connectors for Snowflake, BigQuery, Databricks, and Redshift.
The Practical Takeaway
Data observability is not a product category that requires a vendor. It is a set of engineering practices: collect metadata systematically, detect anomalies statistically, track lineage automatically, and monitor SLAs contractually.
The data-observability-platform repository demonstrates that a capable observability platform can be built with PostgreSQL, Python, and Streamlit. The total infrastructure cost is a single database and a compute instance.
Start by instrumenting what you already have. If you run dbt, you already have test results and schema metadata. If you run Airflow, you already have execution metrics. The observability platform just connects these signals and watches them over time.
Monitor your data like SREs monitor applications. Your stakeholders deserve the same reliability guarantees for their dashboards that your users expect from your APIs.