Skip to main content

Step 2: Add Kafka for Real-Time Streaming

In the previous step, you created a fan-out pipeline that sends messages to two simple destinations. Now, you will replace one of those with a real-world streaming destination: Kafka.

The Goal

You will modify your fan-out.yaml file to send messages to both a local file and a Kafka topic concurrently.

Implementation

  1. Start with the Previous Pipeline: Open the fan-out.yaml file you created in Step 1. It should look like this:

    fan-out.yaml
    output:
    broker:
    pattern: fan_out
    outputs:
    - file:
    path: /tmp/events.jsonl
    codec: lines
    - stdout:
    codec: lines
  2. Replace stdout with Kafka: To add Kafka, simply replace the stdout output configuration with a minimal kafka output block.

    Modify the output in fan-out.yaml
    output:
    broker:
    pattern: fan_out
    outputs:
    # This output remains the same
    - file:
    path: /tmp/events.jsonl
    codec: lines

    # Replace stdout with this kafka block
    -kafka:
    addresses: [ ${KAFKA_BROKERS} ] # From your local-development setup
    topic: "sensor-events"
    key: ${!this.sensor_id} # Use the sensor_id for partitioning

    That's it. The broker will now send a copy of every message to the file and another copy to the sensor-events Kafka topic.

  3. Deploy and Test:

    # Ensure Kafka is running
    docker compose -f services/kafka.yml up -d

    # Send a test event with a sensor_id
    curl -X POST http://localhost:8080/events \
    -H "Content-Type: application/json" \
    -d '{"sensor_id": "sensor-123", "message": "This goes to a file and Kafka"}'
  4. Verify:

    • Check the file: The message should appear in /tmp/events.jsonl.
    • Check Kafka: Use a console consumer to read from the sensor-events topic. You should see the same message there.

You have now successfully added a real-time streaming destination to your fan-out pipeline. The next steps will follow this same pattern to add S3 for archival and Elasticsearch for search.