Skip to main content

Step 1: Define and Apply a JSON Schema

The foundation of data quality is schema validation. This step teaches you how to define a simple JSON Schema and immediately apply it in a pipeline to accept or reject data based on your rules.

The Goal

You will create a simple schema to validate incoming sensor data and then use the json_schema processor to enforce it.

A Valid Message (Should Pass):

{
"sensor_id": "sensor-42",
"timestamp": "2025-10-20T14:30:00Z",
"reading": 23.5
}

An Invalid Message (Should Fail):

{
"id": "sensor-42",
"time": "2025-10-20T14:30:00Z",
"value": "hot"
}

Implementation

  1. Define Your Schema: First, create a file named sensor-schema.json. This schema requires three fields (sensor_id, timestamp, reading) and enforces their data types.

    sensor-schema.json
    {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Basic Sensor Reading",
    "type": "object",
    "required": [ "sensor_id", "timestamp", "reading" ],
    "properties": {
    "sensor_id": {
    "type": "string"
    },
    "timestamp": {
    "type": "string",
    "format": "date-time"
    },
    "reading": {
    "type": "number"
    }
    }
    }
  2. Create the Validation Pipeline: Now, create a pipeline that uses this schema. Create a file named schema-validator.yaml.

    schema-validator.yaml
    name: schema-validator
    description: A pipeline that validates incoming data against a JSON schema.

    config:
    input:
    http_server:
    address: 0.0.0.0:8080
    path: /sensor/readings

    pipeline:
    processors:
    # This processor is the core of the validation
    - json_schema:
    schema_path: "file://./sensor-schema.json" # Assumes schema is in the same directory

    # This mapping only runs if validation succeeds
    - mapping: `root.validation_status = "passed"`

    output:
    stdout:
    codec: lines

    If the incoming data does not match the schema, the json_schema processor will fail and the message will be rejected automatically.

  3. Deploy and Test:

    # --- Test 1: Send VALID data ---
    curl -X POST http://localhost:8080/sensor/readings \
    -H "Content-Type: application/json" \
    -d
    {
    "sensor_id": "sensor-42",
    "timestamp": "2025-10-20T14:30:00Z",
    "reading": 23.5
    }

    # --- Test 2: Send INVALID data ---
    curl -X POST http://localhost:8080/sensor/readings \
    -H "Content-Type: application/json" \
    -d '{"id": "sensor-42", "reading": "hot"}'
  4. Verify:

    • For the valid request, you will see the message in your output/logs with validation_status: "passed".
    • For the invalid request, you will get an HTTP 400 Bad Request error, and the message will not proceed through the pipeline. Check the pipeline's logs to see the specific validation error message.

You have now built a basic but powerful data validation pipeline. In the next steps, you will learn how to handle validation failures more gracefully.