Skip to main content

Step 4: Multi-Format Detection

In a real-world system, you often receive logs in various formats through a single input. This final step teaches you how to build a unified parser that can intelligently detect the format of each message and route it to the correct parsing logic.

The Goal

You will build a single pipeline that can correctly parse JSON, CSV, and Access Log strings by combining the techniques from the previous three steps.

The "Detect -> Route" Pattern

  1. Detect: A mapping processor inspects the raw log string and uses simple heuristics (like checking for { or <) to guess the format, setting a metadata field like detected_format.
  2. Route: A switch processor then reads this metadata field and sends the message to the appropriate block of processors (parse_json, csv, or grok).

Implementation

  1. Create the Unified Parser: Copy the following configuration into a file named unified-parser.yaml. This pipeline combines the logic from the previous steps.

    unified-parser.yaml
    name: unified-log-parser
    description: A pipeline that detects and parses multiple log formats.

    config:
    input:
    http_server:
    address: 0.0.0.0:8080
    path: /ingest

    pipeline:
    processors:
    # 1. DETECT: Guess the format based on the content
    - mapping: |
    root = this
    let content = this.raw_log.string()
    meta detected_format = if content.starts_with("{ ") {
    "json"
    } else if content.starts_with("<") {
    "syslog"
    } else if content.count(",") >= 2 {
    "csv"
    } else if content.re_match("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}") {
    "access_log"
    } else {
    "unknown"
    }

    # 2. ROUTE: Use a switch to send to the correct parser
    - switch:
    - check: meta("detected_format") == "json"
    processors:
    - mapping: `root = this.raw_log.parse_json()`
    - mapping: `root.parsed_by = "json_parser"`

    - check: meta("detected_format") == "csv"
    processors:
    - csv:
    target_field: root.raw_log
    columns: [ "f1", "f2", "f3", "f4", "f5" ]
    - mapping: `root.parsed_by = "csv_parser"`

    - check: meta("detected_format") == "access_log"
    processors:
    - grok:
    target_field: root.raw_log
    expressions: [ '%{COMMONAPACHELOG}' ]
    - mapping: `root.parsed_by = "access_log_parser"`

    # Fallback for unknown formats
    - processors:
    - mapping: `root.parsed_by = "unknown"`

    output:
    stdout:
    codec: lines
  2. Deploy and Test:

    # --- Send logs in different formats ---
    curl -X POST http://localhost:8080/ingest -d '{"raw_log": "{\"level\":\"info\"}"}'
    curl -X POST http://localhost:8080/ingest -d '{"raw_log": "data1,data2,data3"}'
    curl -X POST http://localhost:8080/ingest -d '{"raw_log": "127.0.0.1 - - [01/Jan/2025:12:00:00 +0000] \"GET / HTTP/1.1\" 200 123"}'
  3. Verify: Check your logs. You will see three structured messages. Each one will have a parsed_by field indicating that it was correctly routed to and processed by the right set of processors.

You have now built a flexible and robust universal log parser.