Skip to main content

Step 1: Severity-Based Priority Routing

In this step, you will implement the most common priority queue pattern: routing log messages to different Kafka topics based on their severity. This ensures that critical alerts are processed immediately while routine logs are handled efficiently.

The Goal

You will modify a basic logging pipeline to route messages to logs-critical, logs-high, logs-normal, or logs-low topics based on the value of the severity field in the message.

The "Classify -> Switch" Pattern

  1. Classify: First, you'll add a mapping processor to create a new field called priority based on the existing severity field. This standardizes different severity inputs (e.g., "WARN" and "WARNING") into a consistent priority level.
  2. Switch: Then, you'll replace the single kafka output with a switch output that directs the message to the correct Kafka topic based on the new priority field.

Implementation

  1. Start with the Foundation: Copy the priority-queues-foundation.yaml to a new file named severity-router.yaml.

    cp examples/data-routing/priority-queues-foundation.yaml severity-router.yaml
  2. Add the Classification Processor: Open severity-router.yaml and add the "Classify" mapping processor to the pipeline section.

    Add this to the 'processors' array in severity-router.yaml
    # This goes after the existing 'mapping' processor
    - mapping: |
    root = this
    # Normalize severity to uppercase for consistent matching
    root.severity = this.severity.string().uppercase()

    # Map severity to a standard priority level
    root.priority = match root.severity {
    "CRITICAL" | "FATAL" => "critical",
    "ERROR" => "high",
    "WARN" | "WARNING" => "normal",
    _ => "low" # Default everything else to low priority
    }
  3. Replace the Output with a Switch: Now, replace the entire output section with the switch block below.

    Replace the 'output' section in severity-router.yaml
    output:
    switch:
    cases:
    - check: this.priority == "critical"
    output:
    kafka:
    addresses: ["${KAFKA_BROKERS}"]
    topic: logs-critical
    # For critical logs, we send immediately (no batching)
    batching:
    count: 1

    - check: this.priority == "high"
    output:
    kafka:
    addresses: ["${KAFKA_BROKERS}"]
    topic: logs-high
    batching:
    count: 10 # Small batch for high-priority
    period: 1s

    - check: this.priority == "normal"
    output:
    kafka:
    addresses: ["${KAFKA_BROKERS}"]
    topic: logs-normal
    batching:
    count: 100 # Standard batch size
    period: 5s

    # Default case for "low" priority and any others
    - output:
    kafka:
    addresses: ["${KAFKA_BROKERS}"]
    topic: logs-low
    batching:
    count: 1000 # Large batch for low-priority
    period: 60s
  4. Deploy and Test:

    # Send a critical event
    curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
    -d '{"severity": "CRITICAL", "message": "Database connection lost"}'

    # Send an info event
    curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
    -d '{"severity": "INFO", "message": "User logged in"}'
  5. Verify: Use a console consumer to check the different Kafka topics. You will see that the critical message arrived immediately in logs-critical, while the info message was batched and sent to logs-low.

You have now implemented a basic but powerful priority queue system based on log severity.