Enrich with Metadata
Raw logs often lack context. To make them useful for debugging and auditing, we need to know where they were processed, which pipeline handled them, and when.
Goal
Add the following metadata to every log event:
- Processing Node: ID and region of the edge node
- Pipeline Info: Version and environment
- Audit: Ingestion timestamp and trace ID
Configuration
We use the mapping processor to inject environment variables and generated values.
1. Define Metadata
We'll add a metadata object to keep things organized.
- mapping: |
root = this
root.metadata = {
"ingest_timestamp": now(),
"trace_id": uuid_v4(),
"node_id": env("NODE_ID").or("unknown_node"),
"region": env("AWS_REGION").or("unknown_region"),
"environment": env("ENVIRONMENT").or("dev")
}
Complete Step 3 Configuration
production-pipeline-step-3.yaml
input:
http_server:
address: "0.0.0.0:8080"
path: /logs/ingest
rate_limit: "1000/1s"
auth:
type: header
header: "X-API-Key"
required_value: "${LOG_API_KEY}"
pipeline:
processors:
# 1. Parse & Validate (from Step 2)
- mapping: |
root = this.parse_json().catch({"message": content()})
if !root.exists("timestamp") { root.timestamp = now() }
if !root.exists("level") { root.level = "INFO" }
# 2. Enrich
- mapping: |
root = this
root.metadata = {
"ingest_time": now(),
"processing_node": env("NODE_ID").or("local"),
"pipeline_version": "1.0.0"
}
# Example: Add 'urgent' flag for high severity
if root.level == "ERROR" || root.level == "FATAL" {
root.metadata.urgent = true
}
output:
stdout: {}
Deployment & Verification
-
Set Env Vars:
export NODE_ID="node-01" -
Test:
curl -X POST http://localhost:8080/logs/ingest \
-H "X-API-Key: $LOG_API_KEY" \
-d '{"level": "ERROR", "message": "DB fail"}'Result should include
metadatawithprocessing_node: "node-01"andurgent: true.
Next Steps
With enriched data, we can now make intelligent decisions about what to keep and how to prioritize it.