Step 3: Add S3 for Long-Term Archive
Now you will add a third destination to your fan-out pipeline: Amazon S3. S3 is a common destination for cost-effective, long-term archival of raw event data.
The Goal
You will modify your fan-out.yaml file to send messages to a local file, a Kafka topic, and an S3 bucket, all at the same time.
Implementation
-
Start with the Previous Pipeline: Open the
fan-out.yamlfile you created in the previous step. It should look like this:fan-out.yamloutput:
broker:
pattern: fan_out
outputs:
- file:
path: /tmp/events.jsonl
codec: lines
- kafka:
addresses: [ ${KAFKA_BROKERS} ]
topic: "sensor-events"
key: ${!this.sensor_id} -
Add the S3 Output: To add S3, simply add an
aws_s3block to theoutputsarray.Modify the output in fan-out.yamloutput:
broker:
pattern: fan_out
outputs:
- file:
path: /tmp/events.jsonl
codec: lines
-kafka:
addresses: [ ${KAFKA_BROKERS} ]
topic: "sensor-events"
key: ${!this.sensor_id}
# Add this block for S3
- aws_s3:
bucket: ${S3_BUCKET} # From your setup step
region: ${AWS_REGION}
path: "events/${!timestamp_unix_date()}/${!uuid_v4()}.json"
batching:
count: 10
period: 10sNote that we've added a small
batchingconfiguration. S3 works best with larger files rather than many small ones, so even for a simple example, batching is important. -
Configure AWS Credentials: Ensure your environment is configured with AWS credentials. The simplest way for local development is to set the following environment variables:
export AWS_ACCESS_KEY_ID="YOUR_KEY_ID"
export AWS_SECRET_ACCESS_KEY="YOUR_SECRET_KEY"
export AWS_REGION="us-east-1"
export S3_BUCKET="your-unique-bucket-name" -
Deploy and Test:
# Send a test event
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"sensor_id": "sensor-456", "message": "This goes to three places"}' -
Verify:
- Check the local file
/tmp/events.jsonl. - Check the
sensor-eventsKafka topic. - Check your S3 bucket. After the batch period (10 seconds), you should see a new file in the
events/directory.
- Check the local file
You have now built a pipeline that concurrently routes data to three different types of systems: local file, a message queue, and object storage.