Skip to main content

Step 3: Calculate Checksum

Add a checksum to each row so you can verify data integrity during recovery.

The Goal

  • Calculate MD5 hash of the original data fields
  • Exclude metadata fields from the hash
  • Enable integrity verification during restore

Why Checksums Matter

Corruption Detection: Verify backup files weren't corrupted in storage.

Recovery Validation: Confirm restored data matches original.

Compliance: Prove data integrity for audits.

Implementation

step-3-checksum.yaml
pipeline:
processors:
# Step 2: Add metadata (from previous)
- mapping: |
root = this
root._backup_metadata = {
"backup_date": now().ts_format("2006-01-02"),
"backup_timestamp": now(),
"source_host": env("DB_HOST").or("unknown"),
"source_database": env("DB_NAME").or("unknown"),
"pipeline_version": "1.0.0",
"node_id": env("NODE_ID").or("unknown")
}

# Step 3: Calculate row checksum
- mapping: |
root = this
# Create checksum from original data fields only
let data_fields = this.without("_table", "_backup_type", "_backup_metadata")
root._checksum = $data_fields.format_json().hash("md5")

Understanding the Code

ExpressionPurpose
this.without(...)Exclude metadata fields
.format_json()Serialize to consistent JSON string
.hash("md5")Calculate MD5 hash

Why Exclude Metadata?

The checksum should represent the original data, not backup artifacts:

// This data should always produce the same checksum:
{"order_id": 12345, "customer_id": "CUST-001", "total": 99.99}

// Regardless of when/where it was backed up:
{"_backup_metadata": {"backup_timestamp": "..."}}

Expected Output

{
"order_id": 12345,
"customer_id": "CUST-001",
"total": 99.99,
"_table": "orders",
"_backup_type": "incremental",
"_backup_metadata": {...},
"_checksum": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6"
}

Verification During Recovery

After restoring, verify checksums:

import hashlib
import json

def verify_row(row):
# Extract data fields (exclude metadata)
data = {k: v for k, v in row.items()
if not k.startswith('_')}

# Calculate checksum
calculated = hashlib.md5(
json.dumps(data, sort_keys=True).encode()
).hexdigest()

# Compare
return calculated == row['_checksum']

Production Considerations

SHA-256 for Stronger Integrity

For compliance requirements needing cryptographic strength:

root._checksum = $data_fields.format_json().hash("sha256")

Include Primary Key in Metadata

For easier debugging:

root._checksum_context = {
"primary_key": this.order_id,
"checksum": $data_fields.format_json().hash("md5"),
"field_count": $data_fields.keys().length()
}

Batch-Level Checksum

Also calculate checksum for entire batch:

# In output batching config
output:
processors:
- mapping: |
# This runs on batched data
root._batch_checksum = this.format_json().hash("sha256")

Next Step