Skip to main content

Complete Pipeline

This pipeline combines all format transformation techniques from this tutorial:

  • JSON to Avro - Convert JSON to schema-validated Avro
  • Avro to Parquet - Convert for columnar analytics storage
  • Auto-detect - Automatically detect input format and convert
  • Schema validation - Enforce schema during transformation

Full Configuration

transform-formats.yaml
name: transform-formats-complete
type: pipeline
description: Complete Format Transformation Pipeline - Transforms between JSON, CSV, and XML formats with auto-detection.
namespace: production
labels:
category: data-transformation
pattern: format-transformation

config:
input:
http_server:
address: "0.0.0.0:8080"
path: /transform
allowed_verbs: ["POST"]

pipeline:
processors:
# Step 1: Detect input format from Content-Type or structure
- mapping: |
root = this
let content = this.string()

root.source_format = if content.has_prefix("{") || content.has_prefix("[") {
"json"
} else if content.has_prefix("<?xml") || content.has_prefix("<") {
"xml"
} else if content.count(",") >= 1 {
"csv"
} else {
"text"
}

root.raw = content
meta source_format = root.source_format

# Step 2: Parse to normalized structure
- switch:
- check: 'meta("source_format") == "json"'
processors:
- json_documents:
parts: []

- check: 'meta("source_format") == "xml"'
processors:
- xml:
operator: to_json

- check: 'meta("source_format") == "csv"'
processors:
- csv:
parse_header_row: true

# Step 3: Transform to target format (based on Accept header or default)
- mapping: |
root = this
let target = meta("http_accept").or("application/json")

root.target_format = if target.contains("xml") {
"xml"
} else if target.contains("csv") {
"csv"
} else {
"json"
}

meta target_format = root.target_format

# Step 4: Encode to target format
- switch:
- check: 'meta("target_format") == "xml"'
processors:
- xml:
operator: from_json

- check: 'meta("target_format") == "csv"'
processors:
- mapping: |
root = if this.type() == "array" {
this
} else {
[this]
}
- unarchive:
format: csv_lines

# Step 5: Add transformation metadata
- mapping: |
root.data = this
root.transformation = {
"source_format": meta("source_format"),
"target_format": meta("target_format"),
"transformed_at": now()
}

output:
http_server:
sync_response:
status: 200
headers:
Content-Type: "${!meta(\"target_format\")}"

logger:
level: INFO
format: json

metrics:
prometheus:
path: /metrics

Quick Test

# Send JSON data (converted to Avro/Parquet based on config)
curl -X POST http://localhost:8080/data \
-H "Content-Type: application/json" \
-d '{
"sensor_id": "temp-001",
"temperature": 72.5,
"humidity": 45.0,
"timestamp": "2024-01-15T10:30:00Z"
}'

# Output written to configured destination in target format

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy transform-formats.yaml

# Or run locally with expanso-edge
expanso-edge run --config transform-formats.yaml

Download

Download transform-formats.yaml

What's Next?