Step 1: Define and Apply a JSON Schema
The foundation of data quality is schema validation. This step teaches you how to define a simple JSON Schema and immediately apply it in a pipeline to accept or reject data based on your rules.
The Goal
You will create a simple schema to validate incoming sensor data and then use the json_schema processor to enforce it.
A Valid Message (Should Pass):
{
"sensor_id": "sensor-42",
"timestamp": "2025-10-20T14:30:00Z",
"reading": 23.5
}
An Invalid Message (Should Fail):
{
"id": "sensor-42",
"time": "2025-10-20T14:30:00Z",
"value": "hot"
}
Implementation
-
Define Your Schema: First, create a file named
sensor-schema.json. This schema requires three fields (sensor_id,timestamp,reading) and enforces their data types.sensor-schema.json{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Basic Sensor Reading",
"type": "object",
"required": [ "sensor_id", "timestamp", "reading" ],
"properties": {
"sensor_id": {
"type": "string"
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"reading": {
"type": "number"
}
}
} -
Create the Validation Pipeline: Now, create a pipeline that uses this schema. Create a file named
schema-validator.yaml.schema-validator.yamlname: schema-validator
description: A pipeline that validates incoming data against a JSON schema.
config:
input:
http_server:
address: 0.0.0.0:8080
path: /sensor/readings
pipeline:
processors:
# This processor is the core of the validation
- json_schema:
schema_path: "file://./sensor-schema.json" # Assumes schema is in the same directory
# This mapping only runs if validation succeeds
- mapping: `root.validation_status = "passed"`
output:
stdout:
codec: linesIf the incoming data does not match the schema, the
json_schemaprocessor will fail and the message will be rejected automatically. -
Deploy and Test:
# --- Test 1: Send VALID data ---
curl -X POST http://localhost:8080/sensor/readings \
-H "Content-Type: application/json" \
-d
{
"sensor_id": "sensor-42",
"timestamp": "2025-10-20T14:30:00Z",
"reading": 23.5
}
# --- Test 2: Send INVALID data ---
curl -X POST http://localhost:8080/sensor/readings \
-H "Content-Type: application/json" \
-d '{"id": "sensor-42", "reading": "hot"}' -
Verify:
- For the valid request, you will see the message in your output/logs with
validation_status: "passed". - For the invalid request, you will get an HTTP 400 Bad Request error, and the message will not proceed through the pipeline. Check the pipeline's logs to see the specific validation error message.
- For the valid request, you will see the message in your output/logs with
You have now built a basic but powerful data validation pipeline. In the next steps, you will learn how to handle validation failures more gracefully.