Skip to main content

Complete Pipeline

This pipeline provides production-ready database backups:

  1. Extract multiple tables - Orders, inventory, order_items
  2. Add backup metadata - Timestamp, source, version
  3. Calculate checksums - Row-level integrity verification
  4. Route to storage - Table-specific paths, Parquet compression

Result: Reliable, verifiable backups with 70-90% compression.

Full Configuration

nightly-backup.yaml
# Nightly Database Backup Pipeline
# Simple, reliable replication to cloud storage for DR
#
# Use case: Nightly backup of orders and inventory tables from
# on-premise database to cloud cold storage (GCS Nearline/S3 Glacier)
#
# Key features:
# - Full table extract via SQL
# - Minimal transformation (just add metadata)
# - Compressed Parquet format for cost efficiency
# - Date-partitioned storage for easy recovery
# - Checksum verification

name: nightly-backup-orders
description: Backup orders and inventory to cloud cold storage

input:
# Sequence through multiple tables in one pipeline
sequence:
inputs:
# Orders table
- sql_select:
driver: postgres # Or: mysql, odbc, mssql
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: orders
columns: ["*"]
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "orders"
root._backup_type = "incremental"

# Inventory table (full backup - smaller table)
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: inventory
columns: ["*"]
processors:
- mapping: |
root = this
root._table = "inventory"
root._backup_type = "full"

# Order line items
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: order_items
columns: ["*"]
where: "created_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "order_items"
root._backup_type = "incremental"

pipeline:
processors:
# Step 1: Add backup metadata
- mapping: |
root = this
root._backup_metadata = {
"backup_date": now().format("2006-01-02"),
"backup_timestamp": now(),
"source_host": env("DB_HOST").or("unknown"),
"source_database": env("DB_NAME").or("unknown"),
"pipeline_version": "1.0.0",
"node_id": env("NODE_ID").or("unknown")
}

# Step 2: Calculate row checksum for integrity verification
- mapping: |
root = this
# Create checksum from all original fields (excluding metadata)
let data_fields = this.without("_table", "_backup_type", "_backup_metadata")
root._checksum = $data_fields.format_json().hash("md5")

output:
# Route to different paths based on table
switch:
cases:
- check: this._table == "orders"
output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/orders/${!this._backup_metadata.backup_date}/orders-${!timestamp_unix()}.parquet"
content_type: application/octet-stream
# Use Nearline for backups (accessed < 1x/month)
storage_class: NEARLINE
batching:
count: 10000
period: 60s
# Enable Parquet encoding for compression
parquet_encoding:
compression: SNAPPY

- check: this._table == "inventory"
output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/inventory/${!this._backup_metadata.backup_date}/inventory-full.parquet"
content_type: application/octet-stream
storage_class: NEARLINE
batching:
count: 50000 # Inventory is full backup, larger batches
period: 120s
parquet_encoding:
compression: SNAPPY

- check: this._table == "order_items"
output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/order_items/${!this._backup_metadata.backup_date}/items-${!timestamp_unix()}.parquet"
content_type: application/octet-stream
storage_class: NEARLINE
batching:
count: 10000
period: 60s
parquet_encoding:
compression: SNAPPY

# Fallback for unknown tables
- output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/unknown/${!this._table}/${!this._backup_metadata.backup_date}/data-${!timestamp_unix()}.json"
content_type: application/json
storage_class: NEARLINE
batching:
count: 1000
period: 30s

Quick Test

# Set environment variables
export DB_HOST=postgres.internal.corp
export DB_NAME=ecommerce
export DB_USER=backup_reader
export DB_PASSWORD=<secret>
export GCS_BACKUP_BUCKET=my-backup-bucket
export NODE_ID=backup-node-1

# Test (writes to stdout instead of GCS)
expanso-edge run --config nightly-backup.yaml \
--set 'output.switch.cases[0].output=stdout' | head -5

Deploy to Production

Schedule Nightly at 2 AM

expanso-cli job deploy nightly-backup.yaml --cron "0 2 * * *"

Verify Deployment

# Check job status
expanso-cli job describe nightly-backup-orders

# View recent runs
expanso-cli job executions nightly-backup-orders --limit 5

Recovery Procedures

List Available Backups

# List all backup dates
gsutil ls gs://${GCS_BACKUP_BUCKET}/backups/orders/

# List files for specific date
gsutil ls gs://${GCS_BACKUP_BUCKET}/backups/orders/2024-01-15/

Download Backup

# Download single file
gsutil cp gs://${GCS_BACKUP_BUCKET}/backups/orders/2024-01-15/orders-1705276800.parquet ./

# Download entire date
gsutil -m cp -r gs://${GCS_BACKUP_BUCKET}/backups/orders/2024-01-15/ ./restore/

Restore to Database

import pandas as pd
import psycopg2

# Read Parquet
df = pd.read_parquet('orders-1705276800.parquet')

# Verify checksums
for _, row in df.iterrows():
data = row.drop(['_table', '_backup_type', '_backup_metadata', '_checksum'])
calculated = hashlib.md5(data.to_json().encode()).hexdigest()
assert calculated == row['_checksum'], f"Checksum mismatch for {row['order_id']}"

# Restore to database
conn = psycopg2.connect(...)
df.drop(columns=['_table', '_backup_type', '_backup_metadata', '_checksum']).to_sql(
'orders_restored', conn, if_exists='replace', index=False
)

Monitoring

Check Backup Size

# Total backup size
gsutil du -sh gs://${GCS_BACKUP_BUCKET}/backups/

# Size by table
gsutil du -sh gs://${GCS_BACKUP_BUCKET}/backups/orders/
gsutil du -sh gs://${GCS_BACKUP_BUCKET}/backups/inventory/

Alert on Failure

# Add to pipeline
output:
broker:
outputs:
- # Normal storage output...
- processors:
- mapping: |
root = if errored() {
{
"alert": "Backup failed",
"table": this._table,
"error": error()
}
} else { deleted() }
http_client:
url: "https://alerts.example.com/webhook"

Cost Estimation

ComponentMonthly Cost (1TB)
Nearline Storage$10
Write Operations$0.50
Network Egress$0 (same region)
Total~$10.50/TB/month

Compare to:

  • Standard storage: $20/TB/month
  • Traditional backup software: $50-200/TB/month

Download

Download nightly-backup.yaml

What's Next?