Skip to main content

Advanced Circuit Breaker Patterns

Once you have mastered the basic try/catch and fallback mechanisms, you can implement more sophisticated resilience patterns. This section covers production-grade techniques for building highly available pipelines.

Pattern 1: Circuit Breaker with Caching

For read-heavy operations, caching is the most effective way to reduce load on downstream systems and improve resilience. If the primary data source is unavailable, the pipeline can fall back to serving slightly stale data from the cache.

Use Case: Enriching events with user metadata that changes infrequently.

Configuration

This pipeline tries to fetch data from a cache first. Only on a cache miss does it attempt to call the database, and it caches the result on a successful call.

cached-db-breaker.yaml
config:
# Define a cache resource
cache_resources:
- label: user_profile_cache
memory:
cap: 5000
default_ttl: 5m # Cache entries for 5 minutes

pipeline:
processors:
# 1. Check the cache first
- cache:
resource: user_profile_cache
operator: get
key: user_${!this.user_id}

# 2. Act based on cache hit or miss
- switch:
cases:
# CACHE HIT: Use the cached data
- check: this.cached_data.exists()
processors:
- mapping: |
root = this
root.user_profile = this.cached_data.parse_json()
root.data_source = "cache"

# CACHE MISS: Query the database
- processors:
- try:
- sql_select:
driver: postgres
data_source_name: ${DB_CONNECTION_STRING}
query: "SELECT user_name, user_tier FROM users WHERE user_id = $1"
args_mapping: `root = [ this.user_id ]`
timeout: 3s

- mapping: |
root = this
root.user_profile = { "name": this.user_name, "tier": this.user_tier }
root.data_source = "database"

# On success, write the result to the cache
- cache:
resource: user_profile_cache
operator: set
key: user_${!this.user_id}
value: ${!this.user_profile.format_json()}
catch:
# If the DB fails, mark it and continue without enrichment
- mapping: |
root = this
root.data_source = "none"
root.db_error = true

Pattern 2: Read-Write Separation

Write operations (e.g., INSERT, UPDATE) are often more critical and have different failure modes than read operations (SELECT). It's a good practice to use separate, stricter circuit breaker policies for writes.

Use Case: A pipeline that both reads user preferences and writes new user events to a database.

Configuration

This example uses a switch to route events based on an operation field. Writes get a very conservative circuit breaker, while reads are more tolerant.

read-write-breakers.yaml
pipeline:
processors:
- switch:
cases:
# READ operations have a tolerant circuit breaker
- check: this.operation == "read"
processors:
- try:
- sql_select:
driver: postgres
data_source_name: ${DB_CONNECTION_STRING}
timeout: 10s
max_open_connections: 20 # More connections for reads
query: ${!this.sql_query}
catch:
- mapping: `root.read_failed = true`

# WRITE operations have a strict circuit breaker
- check: this.operation == "write"
processors:
- try:
- sql_insert:
driver: postgres
data_source_name: ${DB_CONNECTION_STRING}
timeout: 3s # Fail fast on writes
max_open_connections: 5 # Fewer connections for writes
query: ${!this.sql_query}
catch:
- mapping: `root.write_failed = true`

output:
switch:
# Failed writes are sent to a retry queue
- check: this.write_failed == true
output:
kafka:
addresses: [${KAFKA_BROKERS}]
topic: "database_writes_retry"

# All other events go to a standard output
- output:
stdout: {}

Pattern 3: Adaptive Circuit Breaker

An adaptive circuit breaker can dynamically change its behavior based on the health of a downstream system. This is an advanced technique that uses a lightweight health check to decide which circuit breaker policy to apply.

Use Case: Before calling a critical, expensive API, first ping a lightweight /health endpoint on that same API.

Configuration

adaptive-breaker.yaml
pipeline:
processors:
# 1. Perform a quick health check
- try:
- http:
url: ${MOCK_API_URL}/health
verb: GET
timeout: 1s
- mapping: `root.api_is_healthy = true`
catch:
- mapping: `root.api_is_healthy = false`

# 2. Choose a breaker policy based on health
- switch:
cases:
# If healthy, use normal settings
- check: this.api_is_healthy == true
processors:
- http:
url: ${MOCK_API_URL}/metadata/${!this.sensor_id}
timeout: 10s
retries: 5

# If unhealthy, use aggressive settings to fail fast
- processors:
- http:
url: ${MOCK_API_URL}/metadata/${!this.sensor_id}
timeout: 2s
retries: 1

These advanced patterns provide much greater control over the resilience and performance of your pipelines, allowing you to build robust, production-ready systems.