Step 4: Add Elasticsearch for Search and Analytics
This final step in building the core fan-out pipeline is to add Elasticsearch. This will provide powerful search and analytics capabilities for your data.
The Goal
You will modify your fan-out.yaml file to send messages to a local file, a Kafka topic, an S3 bucket, and an Elasticsearch index, all at the same time.
Implementation
-
Start with the Previous Pipeline: Open the
fan-out.yamlfile you have been building in the previous steps. -
Add the Elasticsearch Output: Add an
elasticsearchblock to theoutputsarray in yourbrokerconfiguration.Modify the output in fan-out.yamloutput:
broker:
pattern: fan_out
outputs:
- file:
path: /tmp/events.jsonl
codec: lines
-kafka:
addresses: [ ${KAFKA_BROKERS} ]
topic: "sensor-events"
key: ${!this.sensor_id}
- aws_s3:
bucket: ${S3_BUCKET}
region: ${AWS_REGION}
path: "events/${!timestamp_unix_date()}/${!uuid_v4()}.json"
batching:
count: 10
period: 10s
# Add this block for Elasticsearch
- elasticsearch:
urls: [ ${ES_ENDPOINT} ] # e.g., http://localhost:9200
index: "sensor-events-${!timestamp_unix_date('2006-01-02')}"
id: ${!this.event_id.or(uuid_v4())}
batching:
count: 10
period: 10sWe use a daily index name (
sensor-events-YYYY-MM-DD) which is a common best practice. Theidfield is set from the event's ID to prevent duplicate documents. -
Configure Elasticsearch: Ensure you have an Elasticsearch instance running and the required environment variables are set.
export ES_ENDPOINT="http://localhost:9200" -
Deploy and Test:
# Send a test event with an event_id
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event_id": "evt-789", "sensor_id": "sensor-789", "message": "This goes to all four destinations"}' -
Verify: After the 10-second batch period, you can verify that the data has arrived in Elasticsearch.
curl -X GET "${ES_ENDPOINT}/_search?q=event_id:evt-789"You should also see the data in your local file, Kafka topic, and S3 bucket.
You have now built a complete, multi-destination fan-out pipeline that concurrently routes data to four different systems, each serving a distinct purpose: local debugging, real-time streaming, long-term archival, and search/analytics.