Complete Pipeline
This pipeline provides production-ready database backups:
- Extract multiple tables - Orders, inventory, order_items
- Add backup metadata - Timestamp, source, version
- Calculate checksums - Row-level integrity verification
- Route to storage - Table-specific paths, Parquet compression
Result: Reliable, verifiable backups with 70-90% compression.
Full Configuration
nightly-backup.yaml
# Nightly Database Backup Pipeline
# Simple, reliable replication to cloud storage for DR
#
# Use case: Nightly backup of orders and inventory tables from
# on-premise database to cloud cold storage (GCS Nearline/S3 Glacier)
#
# Key features:
# - Full table extract via SQL
# - Minimal transformation (just add metadata)
# - Compressed Parquet format for cost efficiency
# - Date-partitioned storage for easy recovery
# - Checksum verification
name: nightly-backup-orders
description: Backup orders and inventory to cloud cold storage
input:
# Sequence through multiple tables in one pipeline
sequence:
inputs:
# Orders table
- sql_select:
driver: postgres # Or: mysql, odbc, mssql
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: orders
columns: ["*"]
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "orders"
root._backup_type = "incremental"
# Inventory table (full backup - smaller table)
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: inventory
columns: ["*"]
processors:
- mapping: |
root = this
root._table = "inventory"
root._backup_type = "full"
# Order line items
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: order_items
columns: ["*"]
where: "created_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "order_items"
root._backup_type = "incremental"
pipeline:
processors:
# Step 1: Add backup metadata
- mapping: |
root = this
root._backup_metadata = {
"backup_date": now().format("2006-01-02"),
"backup_timestamp": now(),
"source_host": env("DB_HOST").or("unknown"),
"source_database": env("DB_NAME").or("unknown"),
"pipeline_version": "1.0.0",
"node_id": env("NODE_ID").or("unknown")
}
# Step 2: Calculate row checksum for integrity verification
- mapping: |
root = this
# Create checksum from all original fields (excluding metadata)
let data_fields = this.without("_table", "_backup_type", "_backup_metadata")
root._checksum = $data_fields.format_json().hash("md5")
output:
# Route to different paths based on table
switch:
cases:
- check: this._table == "orders"
output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/orders/${!this._backup_metadata.backup_date}/orders-${!timestamp_unix()}.parquet"
content_type: application/octet-stream
# Use Nearline for backups (accessed < 1x/month)
storage_class: NEARLINE
batching:
count: 10000
period: 60s
# Enable Parquet encoding for compression
parquet_encoding:
compression: SNAPPY
- check: this._table == "inventory"
output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/inventory/${!this._backup_metadata.backup_date}/inventory-full.parquet"
content_type: application/octet-stream
storage_class: NEARLINE
batching:
count: 50000 # Inventory is full backup, larger batches
period: 120s
parquet_encoding:
compression: SNAPPY
- check: this._table == "order_items"
output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/order_items/${!this._backup_metadata.backup_date}/items-${!timestamp_unix()}.parquet"
content_type: application/octet-stream
storage_class: NEARLINE
batching:
count: 10000
period: 60s
parquet_encoding:
compression: SNAPPY
# Fallback for unknown tables
- output:
gcp_cloud_storage:
bucket: "${GCS_BACKUP_BUCKET}"
path: "backups/unknown/${!this._table}/${!this._backup_metadata.backup_date}/data-${!timestamp_unix()}.json"
content_type: application/json
storage_class: NEARLINE
batching:
count: 1000
period: 30s
Quick Test
# Set environment variables
export DB_HOST=postgres.internal.corp
export DB_NAME=ecommerce
export DB_USER=backup_reader
export DB_PASSWORD=<secret>
export GCS_BACKUP_BUCKET=my-backup-bucket
export NODE_ID=backup-node-1
# Test (writes to stdout instead of GCS)
expanso-edge run --config nightly-backup.yaml \
--set 'output.switch.cases[0].output=stdout' | head -5
Deploy to Production
Schedule Nightly at 2 AM
expanso-cli job deploy nightly-backup.yaml --cron "0 2 * * *"
Verify Deployment
# Check job status
expanso-cli job describe nightly-backup-orders
# View recent runs
expanso-cli job executions nightly-backup-orders --limit 5
Recovery Procedures
List Available Backups
# List all backup dates
gsutil ls gs://${GCS_BACKUP_BUCKET}/backups/orders/
# List files for specific date
gsutil ls gs://${GCS_BACKUP_BUCKET}/backups/orders/2024-01-15/
Download Backup
# Download single file
gsutil cp gs://${GCS_BACKUP_BUCKET}/backups/orders/2024-01-15/orders-1705276800.parquet ./
# Download entire date
gsutil -m cp -r gs://${GCS_BACKUP_BUCKET}/backups/orders/2024-01-15/ ./restore/
Restore to Database
import pandas as pd
import psycopg2
# Read Parquet
df = pd.read_parquet('orders-1705276800.parquet')
# Verify checksums
for _, row in df.iterrows():
data = row.drop(['_table', '_backup_type', '_backup_metadata', '_checksum'])
calculated = hashlib.md5(data.to_json().encode()).hexdigest()
assert calculated == row['_checksum'], f"Checksum mismatch for {row['order_id']}"
# Restore to database
conn = psycopg2.connect(...)
df.drop(columns=['_table', '_backup_type', '_backup_metadata', '_checksum']).to_sql(
'orders_restored', conn, if_exists='replace', index=False
)
Monitoring
Check Backup Size
# Total backup size
gsutil du -sh gs://${GCS_BACKUP_BUCKET}/backups/
# Size by table
gsutil du -sh gs://${GCS_BACKUP_BUCKET}/backups/orders/
gsutil du -sh gs://${GCS_BACKUP_BUCKET}/backups/inventory/
Alert on Failure
# Add to pipeline
output:
broker:
outputs:
- # Normal storage output...
- processors:
- mapping: |
root = if errored() {
{
"alert": "Backup failed",
"table": this._table,
"error": error()
}
} else { deleted() }
http_client:
url: "https://alerts.example.com/webhook"
Cost Estimation
| Component | Monthly Cost (1TB) |
|---|---|
| Nearline Storage | $10 |
| Write Operations | $0.50 |
| Network Egress | $0 (same region) |
| Total | ~$10.50/TB/month |
Compare to:
- Standard storage: $20/TB/month
- Traditional backup software: $50-200/TB/month
Download
Download nightly-backup.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- DB2 to BigQuery - Database migration patterns
- Cross-Border GDPR - Compliance for EU data