Complete Pipeline
This pipeline combines all format transformation techniques from this tutorial:
- JSON to Avro - Convert JSON to schema-validated Avro
- Avro to Parquet - Convert for columnar analytics storage
- Auto-detect - Automatically detect input format and convert
- Schema validation - Enforce schema during transformation
Full Configuration
transform-formats.yaml
name: transform-formats-complete
type: pipeline
description: Complete Format Transformation Pipeline - Transforms between JSON, CSV, and XML formats with auto-detection.
namespace: production
labels:
category: data-transformation
pattern: format-transformation
config:
input:
http_server:
address: "0.0.0.0:8080"
path: /transform
allowed_verbs: ["POST"]
pipeline:
processors:
# Step 1: Detect input format from Content-Type or structure
- mapping: |
root = this
let content = this.string()
root.source_format = if content.has_prefix("{") || content.has_prefix("[") {
"json"
} else if content.has_prefix("<?xml") || content.has_prefix("<") {
"xml"
} else if content.count(",") >= 1 {
"csv"
} else {
"text"
}
root.raw = content
meta source_format = root.source_format
# Step 2: Parse to normalized structure
- switch:
- check: 'meta("source_format") == "json"'
processors:
- json_documents:
parts: []
- check: 'meta("source_format") == "xml"'
processors:
- xml:
operator: to_json
- check: 'meta("source_format") == "csv"'
processors:
- csv:
parse_header_row: true
# Step 3: Transform to target format (based on Accept header or default)
- mapping: |
root = this
let target = meta("http_accept").or("application/json")
root.target_format = if target.contains("xml") {
"xml"
} else if target.contains("csv") {
"csv"
} else {
"json"
}
meta target_format = root.target_format
# Step 4: Encode to target format
- switch:
- check: 'meta("target_format") == "xml"'
processors:
- xml:
operator: from_json
- check: 'meta("target_format") == "csv"'
processors:
- mapping: |
root = if this.type() == "array" {
this
} else {
[this]
}
- unarchive:
format: csv_lines
# Step 5: Add transformation metadata
- mapping: |
root.data = this
root.transformation = {
"source_format": meta("source_format"),
"target_format": meta("target_format"),
"transformed_at": now()
}
output:
http_server:
sync_response:
status: 200
headers:
Content-Type: "${!meta(\"target_format\")}"
logger:
level: INFO
format: json
metrics:
prometheus:
path: /metrics
Quick Test
# Send JSON data (converted to Avro/Parquet based on config)
curl -X POST http://localhost:8080/data \
-H "Content-Type: application/json" \
-d '{
"sensor_id": "temp-001",
"temperature": 72.5,
"humidity": 45.0,
"timestamp": "2024-01-15T10:30:00Z"
}'
# Output written to configured destination in target format
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy transform-formats.yaml
# Or run locally with expanso-edge
expanso-edge run --config transform-formats.yaml
Download
Download transform-formats.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Aggregate Time Windows - Aggregate data over time