Calvin Goh
ETL Pipeline Monitoring Dashboard
Production-grade ETL monitoring with daily performance analytics, intelligent caching, and offline capabilities
The Challenge
How to demonstrate production-ready ETL engineering capabilities with daily monitoring
- 78% of data engineering roles require API integration experience
- Interactive demos receive 3x more engagement than static examples
- Recruiters need to see real-world technical capabilities
The Solution
Complete ETL pipeline with advanced monitoring, error handling, and data quality assurance
ETL Process Deep Dive
Extract
API calls to Malaysian government endpoints with error handling
CPI State API Extraction
class CPIExtractor:
def __init__(self):
self.base_url = "https://api.data.gov.my/data-catalogue"
self.session = requests.Session()
def extract_cpi_data(self, limit=1000):
"""Extract CPI state data with retry logic"""
url = f"{self.base_url}?id=cpi_state&limit={limit}"
for attempt in range(3):
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data)
# Validate data structure
required_cols = ['state', 'date', 'division', 'index']
if not all(col in df.columns for col in required_cols):
raise ValueError("Missing required columns")
return df
except requests.RequestException as e:
if attempt < 2:
time.sleep(2 ** attempt)
continue
raise e
MCOICOP Classification Extraction
def extract_mcoicop_data(self):
"""Extract MCOICOP classification data"""
url = f"{self.base_url}?id=mcoicop"
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data)
# Validate hierarchical structure
if not all(col in df.columns for col in
['digits', 'division', 'desc_en']):
raise ValueError("Invalid MCOICOP structure")
# Filter for relevant classification levels
df_filtered = df[df['digits'].isin([2, 3, 4])]
return df_filtered
except requests.RequestException as e:
logger.error(f"MCOICOP extraction failed: {e}")
raise
Transform
Pandas data cleaning, statistical calculations, and feature engineering
Data Cleaning & Validation
class CPITransformer:
def __init__(self):
self.malaysian_states = [
'Johor', 'Kedah', 'Kelantan', 'Melaka', 'Negeri Sembilan',
'Pahang', 'Pulau Pinang', 'Perak', 'Perlis', 'Sabah',
'Sarawak', 'Selangor', 'Terengganu', 'WP Kuala Lumpur',
'WP Labuan', 'WP Putrajaya'
]
def clean_cpi_data(self, df):
"""Clean and validate CPI data"""
# Convert date column
df['date'] = pd.to_datetime(df['date'])
# Validate states
valid_states = df['state'].isin(self.malaysian_states)
if not valid_states.all():
invalid_states = df[~valid_states]['state'].unique()
logger.warning(f"Invalid states found: {invalid_states}")
df = df[valid_states]
# Clean index values
df['index'] = pd.to_numeric(df['index'], errors='coerce')
df = df.dropna(subset=['index'])
# Remove outliers (3 sigma rule)
df_clean = df.groupby(['state', 'division']).apply(
lambda x: x[np.abs(stats.zscore(x['index'])) < 3]
).reset_index(drop=True)
return df_clean
Feature Engineering
def calculate_inflation_metrics(self, df):
"""Calculate YoY and MoM inflation rates"""
df_sorted = df.sort_values(['state', 'division', 'date'])
# Year-over-Year change
df_sorted['yoy_change'] = df_sorted.groupby(['state', 'division'])['index'].pct_change(periods=12) * 100
# Month-over-Month change
df_sorted['mom_change'] = df_sorted.groupby(['state', 'division'])['index'].pct_change() * 100
# 3-month moving average
df_sorted['ma_3m'] = df_sorted.groupby(['state', 'division'])['index'].rolling(window=3).mean().reset_index(0, drop=True)
# Volatility (12-month rolling std)
df_sorted['volatility'] = df_sorted.groupby(['state', 'division'])['index'].rolling(window=12).std().reset_index(0, drop=True)
# Economic indicators
df_sorted['is_recession'] = df_sorted['yoy_change'] < -2
df_sorted['inflation_category'] = pd.cut(
df_sorted['yoy_change'],
bins=[-np.inf, 0, 2, 5, np.inf],
labels=['Deflation', 'Low', 'Moderate', 'High']
)
return df_sorted
Load
PostgreSQL database insertion with transaction management
Database Connection & Schema
class SupabaseLoader:
def __init__(self):
self.supabase_url = os.getenv('SUPABASE_URL')
self.supabase_key = os.getenv('SUPABASE_KEY')
self.client = create_client(self.supabase_url, self.supabase_key)
def create_tables(self):
"""Create database schema if not exists"""
cpi_schema = """
CREATE TABLE IF NOT EXISTS cpi_state_data (
id SERIAL PRIMARY KEY,
state VARCHAR(100) NOT NULL,
date DATE NOT NULL,
division VARCHAR(10) NOT NULL,
index_value DECIMAL(10,2) NOT NULL,
yoy_change DECIMAL(5,2),
mom_change DECIMAL(5,2),
ma_3m DECIMAL(10,2),
volatility DECIMAL(8,4),
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(state, date, division)
);
CREATE INDEX IF NOT EXISTS idx_cpi_date
ON cpi_state_data(date);
CREATE INDEX IF NOT EXISTS idx_cpi_state
ON cpi_state_data(state);
"""
return self.client.rpc('execute_sql', {'sql': cpi_schema})
Batch Insert with Upsert Logic
def upsert_cpi_data(self, df, batch_size=500):
"""Insert data with conflict resolution"""
total_records = len(df)
inserted_count = 0
updated_count = 0
for i in range(0, total_records, batch_size):
batch = df.iloc[i:i+batch_size]
try:
# Convert to dict for Supabase
records = batch.to_dict('records')
# Upsert with conflict resolution
result = self.client.table('cpi_state_data').upsert(
records,
on_conflict='state,date,division'
).execute()
# Track metrics
if result.data:
batch_count = len(result.data)
inserted_count += batch_count
logger.info(f"Batch {i//batch_size + 1}: "
f"{batch_count} records processed")
except Exception as e:
logger.error(f"Batch {i//batch_size + 1} failed: {e}")
# Continue with next batch
continue
return {
'total_records': total_records,
'inserted': inserted_count,
'updated': updated_count
}
Technical Implementation
Architecture Overview
Data Sources
Malaysian Government APIs providing CPI State and MCOICOP classification data
ETL Pipeline
Python-based processing with pandas, statistical analysis, and data validation
Storage & Monitoring
PostgreSQL database with daily monitoring dashboard and alerting
System Components
Backend Infrastructure
-
ETL Actions - Scheduled ETL jobs running every 4 hours
-
Python 3.11 - Core ETL processing with asyncio for concurrent operations
-
PostgreSQL - Time-series optimized database with RLS policies
-
Error Handling - Exponential backoff, retry logic, and graceful degradation
Monitoring & Analytics
-
Real-time Dashboard - Daily refresh aligned with government API schedule
-
Chart.js & Plotly - Interactive visualizations for performance metrics
-
Alerting System - Threshold-based alerts for pipeline failures
-
Performance Tracking - API response times, throughput, and error rates
Data Flow
Gov APIs
ETL Pipeline
PostgreSQL
Scripts
Cache
Dashboard
Key Technical Features
Resilient Data Extraction
Retry logic with exponential backoff, connection pooling, and timeout handling
Statistical Analysis
YoY/MoM calculations, moving averages, volatility metrics, and outlier detection
Data Quality Assurance
Schema validation, completeness checks, accuracy scoring, and anomaly detection
Performance Optimization
Batch processing, indexed queries, connection pooling, and caching strategies
Monitoring & Alerting
Real-time metrics, threshold alerts, error categorization, and trend analysis
Scalable Architecture
Modular design, horizontal scaling ready, cloud-native deployment
ETL Pipeline Monitoring Dashboard
Live Dashboard Status
Real-time ETL monitoring with JSON data feeds
Production ETL Monitoring Dashboard
Success Rate
--%
Total Runs
--
Failed Runs
--
Avg Duration
--s
Records
--
Data Quality
--%
Avg Response
--s
Active APIs
--
Pipeline Status
Pipeline Execution Timeline
Throughput Trends
Data Quality
Overall Data Quality Score
Record Count Trends
Data Quality Trends
SLA Monitoring & Compliance
SLA Health Score
🏥Active Violations
🚨Compliance Summary
📊SLA Compliance Metrics
SLA Compliance Trends
Performance
API Response Times
Database Performance
🖥️Backend System Status
🔄Backend Integration Status Live
📡Telemetry API
📊Data Collection
💾Storage
📈Pipeline Event Monitoring & PostgreSQL Operations Production Active
💾JSON Cache System Performance Active Cache
✅Data Quality
📊Cache Metrics
🚀Performance Insights
Error Analysis
Error Categories
Error Trends
🚨Performance Alerting Dashboard
📊Alert Analytics
📈Alert Statistics
🏥System Health Score
📚Alert History & Search
| Time | Alert | Severity | Category | Status | Actions |
|---|---|---|---|---|---|
|
No alerts found
|
|||||
Technology Stack
Project Impact & Skills Demonstrated
ETL Monitoring
Real-time pipeline performance monitoring with comprehensive error tracking and data quality assurance
API Integration
Robust government API integration with retry logic and rate limiting
Data Architecture
Scalable database design with intelligent data retention and optimization
Performance Analytics
Real-time monitoring dashboards tracking pipeline throughput, latency, and success rates
DevOps Integration
ETL Actions automation for continuous data pipeline execution
Data Quality
Comprehensive validation, cleaning, and statistical outlier detection
Need Production-Grade ETL Monitoring?
This project demonstrates advanced ETL monitoring and data engineering capabilities for building robust, monitored data pipelines in enterprise environments.