Complete Pipeline
This pipeline combines all log enrichment techniques from this tutorial:
- Data lineage - Add source tracking and audit trail metadata
- Restructured output - Clean event/metadata separation for analytics
- Intelligent batching - Time and size-based batching for S3
- Compressed storage - Gzip compression for cost-effective archival
Full Configuration
enrich-export.yaml
name: enrich-export-complete
type: pipeline
description: Production-ready pipeline for enriching logs with metadata and exporting to Amazon S3.
namespace: production
labels:
category: log-processing
pattern: enrichment-export
config:
input:
generate:
interval: ${GENERATE_INTERVAL:-1s}
count: ${GENERATE_COUNT:-0}
mapping: |
root.id = uuid_v4()
root.timestamp = now()
let level_random = random_int() % 100
root.level = if $level_random < 70 {
"INFO"
} else if $level_random < 90 {
"WARN"
} else {
"ERROR"
}
let services = [
{"name": "api-gateway", "weight": 30},
{"name": "auth-service", "weight": 20},
{"name": "user-service", "weight": 15},
{"name": "payment-service", "weight": 15},
{"name": "notification-service", "weight": 10},
{"name": "analytics-service", "weight": 10}
]
root.service = $services.index(random_int() % 6).name
root.user_id = "user_" + (random_int() % 10000)
root.session_id = "session_" + (random_int() % 1000)
root.request_id = uuid_v4()
root.trace_id = uuid_v4()
root.duration_ms = random_int() % 5000 + 50
root.status_code = match {
this.level == "INFO" => [200, 201, 202, 204].index(random_int() % 4)
this.level == "WARN" => [400, 401, 403, 404, 429].index(random_int() % 5)
this.level == "ERROR" => [500, 502, 503, 504].index(random_int() % 4)
_ => 200
}
root.message = match {
this.service == "api-gateway" => "Request processed: " + this.status_code + " in " + this.duration_ms + "ms"
this.service == "auth-service" => if this.level == "ERROR" { "Authentication failed" } else { "User authenticated successfully" }
this.service == "payment-service" => if this.level == "ERROR" { "Payment processing failed" } else { "Payment processed" }
_ => "Service operation completed with status " + this.status_code
}
pipeline:
processors:
# Step 1: Add lineage metadata
- mapping: |
root = this
let processing_start = now()
root.lineage = {
"node_id": env("NODE_ID").or("edge-node-" + uuid_v4().slice(0, 8)),
"node_region": env("AWS_REGION").or("us-east-1"),
"pipeline_name": "log-enrichment-s3-production",
"pipeline_version": env("PIPELINE_VERSION").or("5.0.0"),
"processed_at": $processing_start,
"environment": env("ENVIRONMENT").or("production")
}
# Step 2: Restructure into event/metadata format
- mapping: |
root.event = {
"id": this.id.or(uuid_v4()),
"timestamp": this.timestamp.or(now()),
"type": "application_log",
"application": {
"service": this.service.or("unknown"),
"level": this.level.or("INFO"),
"message": this.message.or(""),
"duration_ms": this.duration_ms,
"status_code": this.status_code
},
"trace": {
"request_id": this.request_id.or(uuid_v4()),
"trace_id": this.trace_id.or(uuid_v4())
}
}
root.metadata = {
"lineage": this.lineage,
"quality": {
"completeness_score": (
(if this.user_id != null { 1 } else { 0 }) +
(if this.request_id != null { 1 } else { 0 }) +
(if this.message != null { 1 } else { 0 }) +
(if this.service != null { 1 } else { 0 })
) / 4.0,
"is_error": this.level == "ERROR"
}
}
output:
broker:
pattern: fan_out
outputs:
# Primary S3 export
- aws_s3:
bucket: ${S3_BUCKET_NAME}
path: logs/year=${!timestamp("2006")}/month=${!timestamp("01")}/day=${!timestamp("02")}/logs_${!timestamp_unix()}.jsonl.gz
batching:
count: ${BATCH_COUNT:-200}
period: ${BATCH_PERIOD:-2m}
byte_size: ${BATCH_SIZE:-5242880}
processors:
- compress:
algorithm: gzip
level: 6
content_type: application/x-ndjson
content_encoding: gzip
storage_class: ${S3_STORAGE_CLASS:-STANDARD_IA}
credentials:
profile: ${AWS_PROFILE}
region: ${AWS_REGION}
# Error stream
- aws_s3:
bucket: ${S3_ERROR_BUCKET_NAME:-${S3_BUCKET_NAME}}
path: errors/year=${!timestamp("2006")}/month=${!timestamp("01")}/day=${!timestamp("02")}/error_${!timestamp_unix()}.jsonl
processors:
- mapping: |
root = if this.event.application.level == "ERROR" {
this
} else {
deleted()
}
batching:
count: 50
period: 1m
content_type: application/x-ndjson
storage_class: STANDARD
credentials:
profile: ${AWS_PROFILE}
region: ${AWS_REGION}
logger:
level: ${LOG_LEVEL:-info}
format: json
metrics:
prometheus:
prefix: log_enrichment_prod
Quick Test
# Send log events
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{
"level": "INFO",
"message": "User logged in",
"user_id": "123",
"service": "auth"
}'
# Events enriched with lineage metadata and batched to S3
# Check s3://your-bucket/logs/YYYY/MM/DD/ for output
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy enrich-export.yaml
# Or run locally with expanso-edge
expanso-edge run --config enrich-export.yaml
Download
Download enrich-export.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Filter Severity - Route logs by severity level