Step 4: Multi-Format Detection
In a real-world system, you often receive logs in various formats through a single input. This final step teaches you how to build a unified parser that can intelligently detect the format of each message and route it to the correct parsing logic.
The Goal
You will build a single pipeline that can correctly parse JSON, CSV, and Access Log strings by combining the techniques from the previous three steps.
The "Detect -> Route" Pattern
- Detect: A
mappingprocessor inspects the raw log string and uses simple heuristics (like checking for{or<) to guess the format, setting a metadata field likedetected_format. - Route: A
switchprocessor then reads this metadata field and sends the message to the appropriate block of processors (parse_json,csv, orgrok).
Implementation
-
Create the Unified Parser: Copy the following configuration into a file named
unified-parser.yaml. This pipeline combines the logic from the previous steps.unified-parser.yamlname: unified-log-parser
description: A pipeline that detects and parses multiple log formats.
config:
input:
http_server:
address: 0.0.0.0:8080
path: /ingest
pipeline:
processors:
# 1. DETECT: Guess the format based on the content
- mapping: |
root = this
let content = this.raw_log.string()
meta detected_format = if content.starts_with("{ ") {
"json"
} else if content.starts_with("<") {
"syslog"
} else if content.count(",") >= 2 {
"csv"
} else if content.re_match("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}") {
"access_log"
} else {
"unknown"
}
# 2. ROUTE: Use a switch to send to the correct parser
- switch:
- check: meta("detected_format") == "json"
processors:
- mapping: `root = this.raw_log.parse_json()`
- mapping: `root.parsed_by = "json_parser"`
- check: meta("detected_format") == "csv"
processors:
- csv:
target_field: root.raw_log
columns: [ "f1", "f2", "f3", "f4", "f5" ]
- mapping: `root.parsed_by = "csv_parser"`
- check: meta("detected_format") == "access_log"
processors:
- grok:
target_field: root.raw_log
expressions: [ '%{COMMONAPACHELOG}' ]
- mapping: `root.parsed_by = "access_log_parser"`
# Fallback for unknown formats
- processors:
- mapping: `root.parsed_by = "unknown"`
output:
stdout:
codec: lines -
Deploy and Test:
# --- Send logs in different formats ---
curl -X POST http://localhost:8080/ingest -d '{"raw_log": "{\"level\":\"info\"}"}'
curl -X POST http://localhost:8080/ingest -d '{"raw_log": "data1,data2,data3"}'
curl -X POST http://localhost:8080/ingest -d '{"raw_log": "127.0.0.1 - - [01/Jan/2025:12:00:00 +0000] \"GET / HTTP/1.1\" 200 123"}' -
Verify: Check your logs. You will see three structured messages. Each one will have a
parsed_byfield indicating that it was correctly routed to and processed by the right set of processors.
You have now built a flexible and robust universal log parser.