Complete Pipeline
This pipeline combines all content splitting techniques from this tutorial:
- Array splitting - Split JSON arrays into individual messages using
unarchive - Context preservation - Store parent metadata before split, restore after
- Conditional routing - Route split messages by value (e.g., temperature thresholds)
Full Configuration
content-splitting.yaml
name: split-sensor-array
description: Split sensor reading arrays into individual messages
type: pipeline
namespace: production
config:
input:
http_server:
address: 0.0.0.0:8080
path: /sensors/bulk
timeout: 5s
pipeline:
processors:
# Step 1: Store parent context in metadata BEFORE splitting
- mapping: |
meta device_id = this.device_id
meta batch_timestamp = this.timestamp
root = this
# Step 2: Validate structure
- mapping: |
if !this.readings.exists() || this.readings.type() != "array" {
throw("Missing or invalid readings array")
}
root = this
# Step 3: Split array into individual messages
- unarchive:
format: json_array
field: readings
# Step 4: Enrich each split message with parent context
- mapping: |
root = this
root.device_id = meta("device_id")
root.batch_timestamp = meta("batch_timestamp")
root.edge_node = env("NODE_ID").or("unknown")
root.processed_at = now()
output:
switch:
cases:
# Critical: High temperature alert
- check: this.value > 80.0 && this.unit == "F"
output:
http_client:
url: ${ALERT_SERVICE_URL}/critical-temp
verb: POST
headers:
Content-Type: application/json
max_retries: 5
# Warning: Elevated temperature
- check: this.value > 70.0 && this.value <= 80.0 && this.unit == "F"
output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: elevated-temp-warnings
batching:
count: 50
period: 30s
# Normal: Archive for analytics
- output:
file:
path: /var/expanso/sensor-data/normal-${!timestamp_unix_date("2006-01-02")}.jsonl
codec: lines
batching:
count: 1000
period: 5m
Quick Test
# Send a batch of sensor readings
curl -X POST http://localhost:8080/sensors/bulk \
-H "Content-Type: application/json" \
-d '{
"device_id": "sensor-001",
"timestamp": "2024-01-15T10:30:00Z",
"readings": [
{"sensor_type": "temperature", "value": 72.5, "unit": "F"},
{"sensor_type": "temperature", "value": 85.0, "unit": "F"},
{"sensor_type": "humidity", "value": 45.2, "unit": "%"}
]
}'
# High temp (85F) goes to alert service
# Normal readings go to Kafka or local file
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy content-splitting.yaml
# Or run locally with expanso-edge
expanso-edge run --config content-splitting.yaml
Download
Download content-splitting.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Fan-Out Pattern - Send to multiple destinations