Step 2: Add Kafka for Real-Time Streaming
In the previous step, you created a fan-out pipeline that sends messages to two simple destinations. Now, you will replace one of those with a real-world streaming destination: Kafka.
The Goal
You will modify your fan-out.yaml file to send messages to both a local file and a Kafka topic concurrently.
Implementation
-
Start with the Previous Pipeline: Open the
fan-out.yamlfile you created in Step 1. It should look like this:fan-out.yamloutput:
broker:
pattern: fan_out
outputs:
- file:
path: /tmp/events.jsonl
codec: lines
- stdout:
codec: lines -
Replace
stdoutwith Kafka: To add Kafka, simply replace thestdoutoutput configuration with a minimalkafkaoutput block.Modify the output in fan-out.yamloutput:
broker:
pattern: fan_out
outputs:
# This output remains the same
- file:
path: /tmp/events.jsonl
codec: lines
# Replace stdout with this kafka block
-kafka:
addresses: [ ${KAFKA_BROKERS} ] # From your local-development setup
topic: "sensor-events"
key: ${!this.sensor_id} # Use the sensor_id for partitioningThat's it. The broker will now send a copy of every message to the file and another copy to the
sensor-eventsKafka topic. -
Deploy and Test:
# Ensure Kafka is running
docker compose -f services/kafka.yml up -d
# Send a test event with a sensor_id
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"sensor_id": "sensor-123", "message": "This goes to a file and Kafka"}' -
Verify:
- Check the file: The message should appear in
/tmp/events.jsonl. - Check Kafka: Use a console consumer to read from the
sensor-eventstopic. You should see the same message there.
- Check the file: The message should appear in
You have now successfully added a real-time streaming destination to your fan-out pipeline. The next steps will follow this same pattern to add S3 for archival and Elasticsearch for search.