Calvin Goh

Live ETL Demo Production Ready

ETL Pipeline Monitoring Dashboard

Production-grade ETL monitoring with daily performance analytics, intelligent caching, and offline capabilities

--
Malaysian Gov APIs
--
Pipeline Success Rate
--
Cached Response Time

The Challenge

How to demonstrate production-ready ETL engineering capabilities with daily monitoring

  • 78% of data engineering roles require API integration experience
  • Interactive demos receive 3x more engagement than static examples
  • Recruiters need to see real-world technical capabilities

The Solution

Complete ETL pipeline with advanced monitoring, error handling, and data quality assurance

Extract data from CPI State & MCOICOP APIs
Transform with pandas & statistical analysis
Load into PostgreSQL with intelligent retention

ETL Process Deep Dive

Extract

API calls to Malaysian government endpoints with error handling

CPI State API Extraction

class CPIExtractor:
    def __init__(self):
        self.base_url = "https://api.data.gov.my/data-catalogue"
        self.session = requests.Session()
    
    def extract_cpi_data(self, limit=1000):
        """Extract CPI state data with retry logic"""
        url = f"{self.base_url}?id=cpi_state&limit={limit}"
        
        for attempt in range(3):
            try:
                response = self.session.get(url, timeout=30)
                response.raise_for_status()
                
                data = response.json()
                df = pd.DataFrame(data)
                
                # Validate data structure
                required_cols = ['state', 'date', 'division', 'index']
                if not all(col in df.columns for col in required_cols):
                    raise ValueError("Missing required columns")
                
                return df
                
            except requests.RequestException as e:
                if attempt < 2:
                    time.sleep(2 ** attempt)
                    continue
                raise e

MCOICOP Classification Extraction

def extract_mcoicop_data(self):
    """Extract MCOICOP classification data"""
    url = f"{self.base_url}?id=mcoicop"
    
    try:
        response = self.session.get(url, timeout=30)
        response.raise_for_status()
        
        data = response.json()
        df = pd.DataFrame(data)
        
        # Validate hierarchical structure
        if not all(col in df.columns for col in 
                  ['digits', 'division', 'desc_en']):
            raise ValueError("Invalid MCOICOP structure")
        
        # Filter for relevant classification levels
        df_filtered = df[df['digits'].isin([2, 3, 4])]
        
        return df_filtered
        
    except requests.RequestException as e:
        logger.error(f"MCOICOP extraction failed: {e}")
        raise
--
Records Extracted Today
--
API Success Rate
--
Avg Response Time
--
Malaysian States

Transform

Pandas data cleaning, statistical calculations, and feature engineering

Data Cleaning & Validation

class CPITransformer:
    def __init__(self):
        self.malaysian_states = [
            'Johor', 'Kedah', 'Kelantan', 'Melaka', 'Negeri Sembilan',
            'Pahang', 'Pulau Pinang', 'Perak', 'Perlis', 'Sabah', 
            'Sarawak', 'Selangor', 'Terengganu', 'WP Kuala Lumpur',
            'WP Labuan', 'WP Putrajaya'
        ]
    
    def clean_cpi_data(self, df):
        """Clean and validate CPI data"""
        # Convert date column
        df['date'] = pd.to_datetime(df['date'])
        
        # Validate states
        valid_states = df['state'].isin(self.malaysian_states)
        if not valid_states.all():
            invalid_states = df[~valid_states]['state'].unique()
            logger.warning(f"Invalid states found: {invalid_states}")
            df = df[valid_states]
        
        # Clean index values
        df['index'] = pd.to_numeric(df['index'], errors='coerce')
        df = df.dropna(subset=['index'])
        
        # Remove outliers (3 sigma rule)
        df_clean = df.groupby(['state', 'division']).apply(
            lambda x: x[np.abs(stats.zscore(x['index'])) < 3]
        ).reset_index(drop=True)
        
        return df_clean

Feature Engineering

def calculate_inflation_metrics(self, df):
    """Calculate YoY and MoM inflation rates"""
    df_sorted = df.sort_values(['state', 'division', 'date'])
    
    # Year-over-Year change
    df_sorted['yoy_change'] = df_sorted.groupby(['state', 'division'])['index'].pct_change(periods=12) * 100
    
    # Month-over-Month change
    df_sorted['mom_change'] = df_sorted.groupby(['state', 'division'])['index'].pct_change() * 100
    
    # 3-month moving average
    df_sorted['ma_3m'] = df_sorted.groupby(['state', 'division'])['index'].rolling(window=3).mean().reset_index(0, drop=True)
    
    # Volatility (12-month rolling std)
    df_sorted['volatility'] = df_sorted.groupby(['state', 'division'])['index'].rolling(window=12).std().reset_index(0, drop=True)
    
    # Economic indicators
    df_sorted['is_recession'] = df_sorted['yoy_change'] < -2
    df_sorted['inflation_category'] = pd.cut(
        df_sorted['yoy_change'],
        bins=[-np.inf, 0, 2, 5, np.inf],
        labels=['Deflation', 'Low', 'Moderate', 'High']
    )
    
    return df_sorted
--
Records After Cleaning
--
Data Quality Score
--
Features Engineered
--
Transform Time

Load

PostgreSQL database insertion with transaction management

Database Connection & Schema

class SupabaseLoader:
    def __init__(self):
        self.supabase_url = os.getenv('SUPABASE_URL')
        self.supabase_key = os.getenv('SUPABASE_KEY')
        self.client = create_client(self.supabase_url, self.supabase_key)
    
    def create_tables(self):
        """Create database schema if not exists"""
        cpi_schema = """
        CREATE TABLE IF NOT EXISTS cpi_state_data (
            id SERIAL PRIMARY KEY,
            state VARCHAR(100) NOT NULL,
            date DATE NOT NULL,
            division VARCHAR(10) NOT NULL,
            index_value DECIMAL(10,2) NOT NULL,
            yoy_change DECIMAL(5,2),
            mom_change DECIMAL(5,2),
            ma_3m DECIMAL(10,2),
            volatility DECIMAL(8,4),
            created_at TIMESTAMP DEFAULT NOW(),
            UNIQUE(state, date, division)
        );
        
        CREATE INDEX IF NOT EXISTS idx_cpi_date 
            ON cpi_state_data(date);
        CREATE INDEX IF NOT EXISTS idx_cpi_state 
            ON cpi_state_data(state);
        """
        
        return self.client.rpc('execute_sql', {'sql': cpi_schema})

Batch Insert with Upsert Logic

def upsert_cpi_data(self, df, batch_size=500):
    """Insert data with conflict resolution"""
    total_records = len(df)
    inserted_count = 0
    updated_count = 0
    
    for i in range(0, total_records, batch_size):
        batch = df.iloc[i:i+batch_size]
        
        try:
            # Convert to dict for Supabase
            records = batch.to_dict('records')
            
            # Upsert with conflict resolution
            result = self.client.table('cpi_state_data').upsert(
                records,
                on_conflict='state,date,division'
            ).execute()
            
            # Track metrics
            if result.data:
                batch_count = len(result.data)
                inserted_count += batch_count
                
                logger.info(f"Batch {i//batch_size + 1}: "
                           f"{batch_count} records processed")
            
        except Exception as e:
            logger.error(f"Batch {i//batch_size + 1} failed: {e}")
            # Continue with next batch
            continue
    
    return {
        'total_records': total_records,
        'inserted': inserted_count,
        'updated': updated_count
    }
--
Records Loaded
--
Load Success Rate
--
Load Duration

Technical Implementation

Architecture Overview

Data Sources

Malaysian Government APIs providing CPI State and MCOICOP classification data

ETL Pipeline

Python-based processing with pandas, statistical analysis, and data validation

Storage & Monitoring

PostgreSQL database with daily monitoring dashboard and alerting

System Components

Backend Infrastructure

  • ETL Actions - Scheduled ETL jobs running every 4 hours
  • Python 3.11 - Core ETL processing with asyncio for concurrent operations
  • PostgreSQL - Time-series optimized database with RLS policies
  • Error Handling - Exponential backoff, retry logic, and graceful degradation

Monitoring & Analytics

  • Real-time Dashboard - Daily refresh aligned with government API schedule
  • Chart.js & Plotly - Interactive visualizations for performance metrics
  • Alerting System - Threshold-based alerts for pipeline failures
  • Performance Tracking - API response times, throughput, and error rates

Data Flow

Gov APIs

ETL Pipeline

PostgreSQL

Scripts

Cache

Dashboard

Key Technical Features

Resilient Data Extraction

Retry logic with exponential backoff, connection pooling, and timeout handling

Statistical Analysis

YoY/MoM calculations, moving averages, volatility metrics, and outlier detection

Data Quality Assurance

Schema validation, completeness checks, accuracy scoring, and anomaly detection

Performance Optimization

Batch processing, indexed queries, connection pooling, and caching strategies

Monitoring & Alerting

Real-time metrics, threshold alerts, error categorization, and trend analysis

Scalable Architecture

Modular design, horizontal scaling ready, cloud-native deployment

ETL Pipeline Monitoring Dashboard

Live Dashboard Status

Real-time ETL monitoring with JSON data feeds

✅ LIVE
Updated daily
📊
Pipeline Status
Real-time monitoring
Performance
API & DB metrics
🔍
Error Analysis
Comprehensive tracking
📈
Data Quality
Validation metrics

Production ETL Monitoring Dashboard

Last updated: --:--:--

Success Rate

--%

Total Runs

--

Failed Runs

--

Avg Duration

--s

Records

--

Data Quality

--%

Avg Response

--s

Active APIs

--

Pipeline Status

Pipeline Execution Timeline

Throughput Trends

Data Quality

Overall Data Quality Score

Record Count Trends

Data Quality Trends

SLA Monitoring & Compliance

N/A
Last Updated: --:--

SLA Health Score

🏥
--
N/A
Target: 90+

Active Violations

🚨
Critical --
Warning --
Info --
-- violations detected

Compliance Summary

📊
Meeting SLA --
At Risk --
Violated --
Window: --

SLA Compliance Metrics

Success Rate
Target: ≥95%
--
N/A
Error Rate
Target: ≤5%
--
N/A
Data Quality
Target: ≥98%
--
N/A
Avg Duration
Target: ≤5min
--
N/A
Data Freshness
Target: ≤24h
--
N/A

SLA Compliance Trends

Performance

Online
📦 Cache Ready

API Response Times

Database Performance

🖥️Backend System Status

Connecting...
Backend Health Score
--
out of 100
ETL Runs Today
--
pipeline executions
Active Alerts
--
system alerts
API Success Rate
--
last 1 hour

🔄Backend Integration Status Live

Last Updated: --
📡Telemetry API
Status: Checking...
Endpoint: localhost:5001
📊Data Collection
Metrics Count: --
Collection Rate: 5s intervals
💾Storage
Database: PosgreSQL
Retention: 24h

📈Pipeline Event Monitoring & PostgreSQL Operations Production Active

Real-time telemetry events & ETL Actions integration
48
Pipeline Events
4.38s
Avg Extraction Time
0.73s
Transformation Time
17.36s
Loading Time

💾JSON Cache System Performance Active Cache

High-performance JSON caching & data quality monitoring
--
Processing Success Rate
--
Total Records Processed
--
Avg Records per Run
--
Data Completeness

Data Quality

Data Type Validation: --
Required Fields Check: --
Value Range Validation: --
Duplicate Detection: --

📊Cache Metrics

CPI State Pipeline: --
Data Validation Pipeline: --
MCOICOP Pipeline: --
Data Freshness Status: --

🚀Performance Insights

Total Pipeline Runs: --
Successful Runs: --
Failed Runs: --
Running Runs: --

Error Analysis

Error Categories

Error Trends

🚨Performance Alerting Dashboard

Engine Active
Critical Alerts
--
active
Warning Alerts
--
active
Total Alerts
--
today
Resolution Rate
--
percentage
Avg Resolution
--
minutes

📊Alert Analytics

Alert Trends (Last 24h)
Alerts by Category
Frontend: --
Backend: --
Performance: --
Top Alerting Metrics
Analyzing alert patterns...

📈Alert Statistics

Alerts Generated: --
Alerts Resolved: --
Acknowledged: --
False Positives: --
Escalated: --

🏥System Health Score

--
Overall System Health
--

📚Alert History & Search

Time Alert Severity Category Status Actions
No alerts found

Technology Stack

Python
Pandas
Plotly
PostgreSQL
ETL Actions

Project Impact & Skills Demonstrated

ETL Monitoring

Real-time pipeline performance monitoring with comprehensive error tracking and data quality assurance

API Integration

Robust government API integration with retry logic and rate limiting

Data Architecture

Scalable database design with intelligent data retention and optimization

Performance Analytics

Real-time monitoring dashboards tracking pipeline throughput, latency, and success rates

DevOps Integration

ETL Actions automation for continuous data pipeline execution

Data Quality

Comprehensive validation, cleaning, and statistical outlier detection

Need Production-Grade ETL Monitoring?

This project demonstrates advanced ETL monitoring and data engineering capabilities for building robust, monitored data pipelines in enterprise environments.