Step 2: Database Circuit Breakers
Database connections are a common point of failure. If a database becomes unavailable, pipelines can crash or hang indefinitely, exhausting connection pools. In this step, you will protect a pipeline against database failures using a try/catch block.
1. The Fragile Pipeline
The database-circuit-breaker-foundation.yaml file contains a simple pipeline. It accepts a user_id, queries a PostgreSQL database to get the user's profile, and adds it to the event.
This pipeline is fragile. If the database is down, the sql_select processor will fail, and the entire pipeline will stop.
2. See It Fail
-
Start the Mock Database: Make sure the PostgreSQL service is running.
# (From the services/postgres.yml guide)
docker compose -f services/postgres.yml up -d -
Stop the Database: Now, simulate an outage.
docker compose -f services/postgres.yml stop -
Send a Request:
curl -X POST http://localhost:8084/user-events \
-H "Content-Type: application/json" \
-d '{"user_id": "user_001"}'The request will hang and eventually fail. The pipeline is now stuck trying to connect to a dead database.
3. Add the Circuit Breaker
You will now add a try/catch block to handle the database failure gracefully.
-
Copy the Foundation:
cp examples/data-routing/database-circuit-breaker-foundation.yaml database-circuit-breaker.yaml -
Modify the Pipeline: Open
database-circuit-breaker.yamland replace thepipelinesection with the one below. This wraps the database call in atryblock and adds acatchblock for error handling.
pipeline: processors:
-
try:
This processor will attempt to run. If it fails,
the 'catch' block is executed.
- sql_select:
driver: postgres
data_source_name: ${DB_CONNECTION_STRING}
query: |
SELECT user_name, user_tier FROM users WHERE user_id = $1
args_mapping: |
root = [ this.user_id ]
These settings help fail faster
timeout: 3s max_open_connections: 5
This mapping only runs if the SQL query succeeds.
- mapping: | root = this root.user_profile.name = this.user_name root.user_profile.tier = this.user_tier root.db_enriched = true root.db_status = "success"
catch:
This block runs ONLY if the 'sql_select' processor fails.
- mapping: | root = this root.db_enriched = false root.db_status = "failed" root.fallback_reason = "database_circuit_breaker_open"
- sql_select:
driver: postgres
data_source_name: ${DB_CONNECTION_STRING}
query: |
SELECT user_name, user_tier FROM users WHERE user_id = $1
args_mapping: |
root = [ this.user_id ]
## 4. Deploy and Test Again
1. **Restart the Database (optional):** You can test with the database running first to see the success path.
```bash
docker compose -f services/postgres.yml start
-
Stop the Database Again:
docker compose -f services/postgres.yml stop -
Send the Same Request:
curl -X POST http://localhost:8084/user-events \
-H "Content-Type: application/json" \
-d '{"user_id": "user_001"}'This time, you will get an immediate response. The output will be a JSON object with
db_enriched: falseanddb_status: "failed". The pipeline handled the error gracefully instead of crashing.
You have now protected your pipeline from database failures. Don't forget to restart your postgres container for the next steps.