Skip to main content

Step 2: Add Lineage Metadata

A fundamental step in log processing is enriching logs with lineage metadata. This is a block of information that tells you where, when, and how a log message was processed. It's critical for debugging, auditing, and tracking data flow.

The Goal

You will add a lineage object to each log event that records which pipeline processed it and at what time.

Implementation

  1. Start with the Foundation: Copy the enrichment-foundation.yaml from the examples/log-processing directory to a new file named add-lineage.yaml.

    cp examples/log-processing/enrichment-foundation.yaml add-lineage.yaml
  2. Add the Lineage Processor: Open add-lineage.yaml and add a mapping processor to the pipeline section. This processor will create the new lineage object.

    Add this to the 'processors' array in add-lineage.yaml
    # This goes after the existing 'mapping' processor
    - mapping: |
    root = this
    root.lineage = {
    "pipeline_name": "enrich-export-tutorial",
    "pipeline_version": "1.0.0",
    "processed_at": now(),
    "processing_node_id": env("NODE_ID").or("unknown-node")
    }

    This uses the env() function to read an environment variable NODE_ID. This is a common pattern for identifying which machine or container processed the data.

  3. Deploy and Test:

    # Set a sample NODE_ID for testing
    export NODE_ID="local-dev-machine"
  4. Verify: Watch the logs from your pipeline. Each log message will now have a lineage object appended to it, containing the pipeline name and the node ID you specified.

    Example Output Snippet:

    {
    "message": "Log message from auth-service",
    // ... other fields
    "lineage": {
    "pipeline_name": "enrich-export-tutorial",
    "pipeline_version": "1.0.0",
    "processed_at": "2025-11-22T23:30:00Z",
    "processing_node_id": "local-dev-machine"
    }
    }

You have successfully added basic but essential audit information to your logs. This metadata is invaluable for troubleshooting in a complex system.