Step 3: Calculate Checksum
Add a checksum to each row so you can verify data integrity during recovery.
The Goal
- Calculate MD5 hash of the original data fields
- Exclude metadata fields from the hash
- Enable integrity verification during restore
Why Checksums Matter
Corruption Detection: Verify backup files weren't corrupted in storage.
Recovery Validation: Confirm restored data matches original.
Compliance: Prove data integrity for audits.
Implementation
step-3-checksum.yaml
pipeline:
processors:
# Step 2: Add metadata (from previous)
- mapping: |
root = this
root._backup_metadata = {
"backup_date": now().ts_format("2006-01-02"),
"backup_timestamp": now(),
"source_host": env("DB_HOST").or("unknown"),
"source_database": env("DB_NAME").or("unknown"),
"pipeline_version": "1.0.0",
"node_id": env("NODE_ID").or("unknown")
}
# Step 3: Calculate row checksum
- mapping: |
root = this
# Create checksum from original data fields only
let data_fields = this.without("_table", "_backup_type", "_backup_metadata")
root._checksum = $data_fields.format_json().hash("md5")
Understanding the Code
| Expression | Purpose |
|---|---|
this.without(...) | Exclude metadata fields |
.format_json() | Serialize to consistent JSON string |
.hash("md5") | Calculate MD5 hash |
Why Exclude Metadata?
The checksum should represent the original data, not backup artifacts:
// This data should always produce the same checksum:
{"order_id": 12345, "customer_id": "CUST-001", "total": 99.99}
// Regardless of when/where it was backed up:
{"_backup_metadata": {"backup_timestamp": "..."}}
Expected Output
{
"order_id": 12345,
"customer_id": "CUST-001",
"total": 99.99,
"_table": "orders",
"_backup_type": "incremental",
"_backup_metadata": {...},
"_checksum": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6"
}
Verification During Recovery
After restoring, verify checksums:
import hashlib
import json
def verify_row(row):
# Extract data fields (exclude metadata)
data = {k: v for k, v in row.items()
if not k.startswith('_')}
# Calculate checksum
calculated = hashlib.md5(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
# Compare
return calculated == row['_checksum']
Production Considerations
SHA-256 for Stronger Integrity
For compliance requirements needing cryptographic strength:
root._checksum = $data_fields.format_json().hash("sha256")
Include Primary Key in Metadata
For easier debugging:
root._checksum_context = {
"primary_key": this.order_id,
"checksum": $data_fields.format_json().hash("md5"),
"field_count": $data_fields.keys().length()
}
Batch-Level Checksum
Also calculate checksum for entire batch:
# In output batching config
output:
processors:
- mapping: |
# This runs on batched data
root._batch_checksum = this.format_json().hash("sha256")