Skip to main content

Complete Pipeline

This pipeline combines all log parsing techniques from this tutorial:

  • Format detection - Auto-detect JSON, CSV, syslog, access log formats
  • JSON parsing - Parse structured JSON logs
  • CSV parsing - Parse delimited logs with header extraction
  • Access log parsing - Parse Apache/Nginx combined log format
  • Syslog parsing - Parse RFC 5424 syslog messages

Full Configuration

parse-logs.yaml
name: parse-logs-complete
type: pipeline
description: Complete Multi-Format Log Parser - Detects and parses JSON, CSV, access logs, and syslog formats.
namespace: production
labels:
category: data-transformation
pattern: log-parsing

config:
input:
http_server:
address: "0.0.0.0:8080"
path: /logs
allowed_verbs: ["POST"]

pipeline:
processors:
# Step 1: Detect log format
- mapping: |
root = this
let content = this.string()

root.detected_format = if content.has_prefix("{") || content.has_prefix("[") {
"json"
} else if content.re_match("^<\\d+>") {
"syslog"
} else if content.re_match("\\d+\\.\\d+\\.\\d+\\.\\d+.*HTTP/") {
"access_log"
} else if content.count(",") >= 2 && !content.contains("{") {
"csv"
} else {
"unknown"
}

root.original = content
meta log_format = root.detected_format

# Step 2: Parse based on detected format
- switch:
# JSON logs
- check: 'meta("log_format") == "json"'
processors:
- try:
- json_documents:
parts: []
- mapping: |
root = this
root.timestamp = this.timestamp.or(this."@timestamp").or(now())
root.level = this.level.or(this.severity).or("INFO").uppercase()
root.message = this.message.or(this.msg).or("")
catch:
- mapping: |
root.parse_error = error()
root.format = "json"
meta log_format = "parse_failed"

# Access logs (Apache/Nginx)
- check: 'meta("log_format") == "access_log"'
processors:
- grok:
expressions:
- '%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{NOTSPACE:path}(?: HTTP/%{NUMBER:http_version})?" %{NUMBER:status} (?:%{NUMBER:bytes}|-)'
- mapping: |
root = this
root.status = this.status.number()
root.bytes = this.bytes.or("0").number()
# Hash IP for privacy
root.client_ip_hash = this.client_ip.hash("sha256").slice(0, 12)
root = root.without("client_ip")

# Syslog
- check: 'meta("log_format") == "syslog"'
processors:
- grok:
expressions:
- '<%{POSINT:priority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:message}'
- mapping: |
root = this
let pri = this.priority.number()
root.facility = (pri / 8).floor()
root.severity = pri % 8
root.level = if this.severity <= 3 { "ERROR" }
else if this.severity <= 4 { "WARN" }
else { "INFO" }

# CSV logs
- check: 'meta("log_format") == "csv"'
processors:
- csv:
columns: ["timestamp", "level", "service", "message"]
- mapping: |
root = this
root.level = this.level.uppercase()

# Unknown format - keep original
- processors:
- mapping: |
root.raw_content = this.original
root.format = "unknown"
root.requires_investigation = true

# Step 3: Add processing metadata
- mapping: |
root = this
root.processed_at = now()
root.format = meta("log_format")
root = root.without("original")

output:
switch:
# Parsed logs → analytics
- check: 'meta("log_format") != "parse_failed" && meta("log_format") != "unknown"'
output:
http_client:
url: "${ANALYTICS_ENDPOINT:http://analytics:8080}/logs"
verb: POST
headers:
Content-Type: application/json
batching:
count: 100
period: 5s

# Failed/unknown → DLQ
- output:
file:
path: "/var/log/expanso/dlq/${!timestamp_unix_date('2006-01-02')}/unparsed.jsonl"
codec: lines

logger:
level: INFO
format: json

metrics:
prometheus:
path: /metrics

Quick Test

# Send JSON log
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"level": "INFO", "message": "User logged in", "user_id": "123"}'

# Send access log
curl -X POST http://localhost:8080/logs \
-H "Content-Type: text/plain" \
-d '192.168.1.1 - - [15/Jan/2024:10:30:00 +0000] "GET /api/users HTTP/1.1" 200 1234'

# Send syslog
curl -X POST http://localhost:8080/logs \
-H "Content-Type: text/plain" \
-d '<134>1 2024-01-15T10:30:00Z myhost myapp 1234 - - User logged in'

# All parsed to structured JSON output

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy parse-logs.yaml

# Or run locally with expanso-edge
expanso-edge run --config parse-logs.yaml

Download

Download parse-logs.yaml

What's Next?