Skip to main content

Step 4: Add Elasticsearch for Search and Analytics

This final step in building the core fan-out pipeline is to add Elasticsearch. This will provide powerful search and analytics capabilities for your data.

The Goal

You will modify your fan-out.yaml file to send messages to a local file, a Kafka topic, an S3 bucket, and an Elasticsearch index, all at the same time.

Implementation

  1. Start with the Previous Pipeline: Open the fan-out.yaml file you have been building in the previous steps.

  2. Add the Elasticsearch Output: Add an elasticsearch block to the outputs array in your broker configuration.

    Modify the output in fan-out.yaml
    output:
    broker:
    pattern: fan_out
    outputs:
    - file:
    path: /tmp/events.jsonl
    codec: lines
    -kafka:
    addresses: [ ${KAFKA_BROKERS} ]
    topic: "sensor-events"
    key: ${!this.sensor_id}
    - aws_s3:
    bucket: ${S3_BUCKET}
    region: ${AWS_REGION}
    path: "events/${!timestamp_unix_date()}/${!uuid_v4()}.json"
    batching:
    count: 10
    period: 10s

    # Add this block for Elasticsearch
    - elasticsearch:
    urls: [ ${ES_ENDPOINT} ] # e.g., http://localhost:9200
    index: "sensor-events-${!timestamp_unix_date('2006-01-02')}"
    id: ${!this.event_id.or(uuid_v4())}
    batching:
    count: 10
    period: 10s

    We use a daily index name (sensor-events-YYYY-MM-DD) which is a common best practice. The id field is set from the event's ID to prevent duplicate documents.

  3. Configure Elasticsearch: Ensure you have an Elasticsearch instance running and the required environment variables are set.

    export ES_ENDPOINT="http://localhost:9200"
  4. Deploy and Test:

    # Send a test event with an event_id
    curl -X POST http://localhost:8080/events \
    -H "Content-Type: application/json" \
    -d '{"event_id": "evt-789", "sensor_id": "sensor-789", "message": "This goes to all four destinations"}'
  5. Verify: After the 10-second batch period, you can verify that the data has arrived in Elasticsearch.

    curl -X GET "${ES_ENDPOINT}/_search?q=event_id:evt-789"

    You should also see the data in your local file, Kafka topic, and S3 bucket.

You have now built a complete, multi-destination fan-out pipeline that concurrently routes data to four different systems, each serving a distinct purpose: local debugging, real-time streaming, long-term archival, and search/analytics.