Skip to main content

Complete Pipeline

This pipeline combines all content splitting techniques from this tutorial:

  • Array splitting - Split JSON arrays into individual messages using unarchive
  • Context preservation - Store parent metadata before split, restore after
  • Conditional routing - Route split messages by value (e.g., temperature thresholds)

Full Configuration

content-splitting.yaml
name: split-sensor-array
description: Split sensor reading arrays into individual messages
type: pipeline
namespace: production

config:
input:
http_server:
address: 0.0.0.0:8080
path: /sensors/bulk
timeout: 5s

pipeline:
processors:
# Step 1: Store parent context in metadata BEFORE splitting
- mapping: |
meta device_id = this.device_id
meta batch_timestamp = this.timestamp
root = this

# Step 2: Validate structure
- mapping: |
if !this.readings.exists() || this.readings.type() != "array" {
throw("Missing or invalid readings array")
}
root = this

# Step 3: Split array into individual messages
- unarchive:
format: json_array
field: readings

# Step 4: Enrich each split message with parent context
- mapping: |
root = this
root.device_id = meta("device_id")
root.batch_timestamp = meta("batch_timestamp")
root.edge_node = env("NODE_ID").or("unknown")
root.processed_at = now()

output:
switch:
cases:
# Critical: High temperature alert
- check: this.value > 80.0 && this.unit == "F"
output:
http_client:
url: ${ALERT_SERVICE_URL}/critical-temp
verb: POST
headers:
Content-Type: application/json
max_retries: 5

# Warning: Elevated temperature
- check: this.value > 70.0 && this.value <= 80.0 && this.unit == "F"
output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: elevated-temp-warnings
batching:
count: 50
period: 30s

# Normal: Archive for analytics
- output:
file:
path: /var/expanso/sensor-data/normal-${!timestamp_unix_date("2006-01-02")}.jsonl
codec: lines
batching:
count: 1000
period: 5m

Quick Test

# Send a batch of sensor readings
curl -X POST http://localhost:8080/sensors/bulk \
-H "Content-Type: application/json" \
-d '{
"device_id": "sensor-001",
"timestamp": "2024-01-15T10:30:00Z",
"readings": [
{"sensor_type": "temperature", "value": 72.5, "unit": "F"},
{"sensor_type": "temperature", "value": 85.0, "unit": "F"},
{"sensor_type": "humidity", "value": 45.2, "unit": "%"}
]
}'

# High temp (85F) goes to alert service
# Normal readings go to Kafka or local file

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy content-splitting.yaml

# Or run locally with expanso-edge
expanso-edge run --config content-splitting.yaml

Download

Download content-splitting.yaml

What's Next?