Step 2: Add Lineage Metadata
A fundamental step in log processing is enriching logs with lineage metadata. This is a block of information that tells you where, when, and how a log message was processed. It's critical for debugging, auditing, and tracking data flow.
The Goal
You will add a lineage object to each log event that records which pipeline processed it and at what time.
Implementation
-
Start with the Foundation: Copy the
enrichment-foundation.yamlfrom theexamples/log-processingdirectory to a new file namedadd-lineage.yaml.cp examples/log-processing/enrichment-foundation.yaml add-lineage.yaml -
Add the Lineage Processor: Open
add-lineage.yamland add amappingprocessor to thepipelinesection. This processor will create the newlineageobject.Add this to the 'processors' array in add-lineage.yaml# This goes after the existing 'mapping' processor
- mapping: |
root = this
root.lineage = {
"pipeline_name": "enrich-export-tutorial",
"pipeline_version": "1.0.0",
"processed_at": now(),
"processing_node_id": env("NODE_ID").or("unknown-node")
}This uses the
env()function to read an environment variableNODE_ID. This is a common pattern for identifying which machine or container processed the data. -
Deploy and Test:
# Set a sample NODE_ID for testing
export NODE_ID="local-dev-machine" -
Verify: Watch the logs from your pipeline. Each log message will now have a
lineageobject appended to it, containing the pipeline name and the node ID you specified.Example Output Snippet:
{
"message": "Log message from auth-service",
// ... other fields
"lineage": {
"pipeline_name": "enrich-export-tutorial",
"pipeline_version": "1.0.0",
"processed_at": "2025-11-22T23:30:00Z",
"processing_node_id": "local-dev-machine"
}
}
You have successfully added basic but essential audit information to your logs. This metadata is invaluable for troubleshooting in a complex system.