Skip to main content

Complete Pipeline

This pipeline combines all log filtering techniques from this tutorial:

  • JSON parsing with fallback - Handle mixed JSON and plain text logs
  • Severity classification - Map log levels to CRITICAL/HIGH/MEDIUM/LOW
  • Conditional routing - Route by severity to different destinations
  • Drop low-value logs - Filter DEBUG/TRACE to reduce storage costs

Full Configuration

filter-severity.yaml
name: filter-severity-complete
type: pipeline
description: Complete Log Severity Filtering Pipeline with JSON parsing, intelligent severity detection, and conditional routing.
namespace: production
labels:
category: log-processing
pattern: severity-filtering

config:
input:
file:
paths:
- /var/log/app/*.log
codec: lines

pipeline:
processors:
# Stage 1: JSON Parsing with Fallback
- mapping: |
let original = this
let parsed = original.parse_json().catch(null)

if parsed != null {
root = parsed
root.original_format = "json"
root.parsing_success = true
} else {
let log_match = original.re_find_all("(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[([A-Z]+)\] (.+)")

if log_match.length() > 0 {
root.timestamp = log_match.0.1
root.level = log_match.0.2
root.message = log_match.0.3
root.original_format = "structured_text"
root.parsing_success = true
} else {
root.original_message = original
root.level = "UNKNOWN"
root.original_format = "unstructured_text"
root.parsing_success = false
}
}

# Stage 2: Processing Metadata
- mapping: |
root.processing_metadata = {
"processed_at": now(),
"node_id": env("NODE_ID").or("unknown"),
"pipeline_version": env("PIPELINE_VERSION").or("1.0")
}

# Stage 3: Severity Detection and Normalization
- mapping: |
let log_level = this.level.string().uppercase()

if log_level == "UNKNOWN" {
let content = this.get("message").string() + this.get("original_message").string()
let content_lower = content.lowercase()

let error_keywords = ["error", "fail", "exception", "crash", "critical", "fatal"]
let warn_keywords = ["warn", "warning", "alert", "caution", "timeout"]

if error_keywords.any(keyword -> content_lower.contains(keyword)) {
this.level = "ERROR"
} else if warn_keywords.any(keyword -> content_lower.contains(keyword)) {
this.level = "WARN"
}
}

let level_mappings = {
"FATAL": "ERROR",
"SEVERE": "ERROR",
"CRITICAL": "ERROR",
"WARNING": "WARN"
}

let normalized_level = level_mappings.get(log_level).or(log_level)
this.level = normalized_level
this.original_level = log_level

# Stage 4: Severity-Based Filtering
- mapping: |
let level = this.level.string().uppercase()

root = if level == "ERROR" || level == "WARN" {
this.filter_metadata = {
"passed_filter": true,
"filter_reason": "severity_match",
"filtering_timestamp": now()
}
this
} else {
deleted()
}

output:
broker:
pattern: fan_out
outputs:
- switch:
cases:
- check: this.level == "ERROR"
output:
file:
path: /var/log/expanso/errors/${!timestamp_unix_date()}.json
codec: lines

- check: this.level == "WARN"
output:
stdout:
codec: lines

- output:
file:
path: /var/log/expanso/unrouted-logs.json
codec: lines

logger:
level: INFO
format: json

metrics:
prometheus:
path: /metrics

Quick Test

# Send critical log (routes to alerting)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"level": "ERROR", "message": "Database connection failed"}'

# Send info log (routes to storage)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"level": "INFO", "message": "User logged in"}'

# Send debug log (dropped)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"level": "DEBUG", "message": "Processing request"}'

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy filter-severity.yaml

# Or run locally with expanso-edge
expanso-edge run --config filter-severity.yaml

Download

Download filter-severity.yaml

What's Next?