Skip to main content

Step 2: Split CSV Batches

This step teaches you how to process a CSV file, turning each row into a separate JSON message for individual processing.

The Goal

You will transform a single CSV file like this:

Input File: transactions.csv
transaction_id,timestamp,amount,currency
txn-001,2025-10-20T10:00:00Z,150.00,USD
txn-002,2025-10-20T10:01:00Z,12500.00,EUR

Into two separate output messages:

Output Message 1
{
"transaction_id": "txn-001",
"timestamp": "2025-10-20T10:00:00Z",
"amount": 150.0,
"currency": "USD"
}
Output Message 2
{
"transaction_id": "txn-002",
"timestamp": "2025-10-20T10:01:00Z",
"amount": 12500.0,
"currency": "EUR"
}

The "File -> Lines -> Structure" Pattern

Unlike splitting a JSON array, processing a CSV file involves reading it line by line.

  1. File Input: Use the file input with a lines scanner to read the file and create a new message for each line.
  2. Skip Header: The first message will be the header row (transaction_id,timestamp,...). We need to identify and delete this message.
  3. Parse Line: For all other messages (the data rows), split the string by the comma delimiter and map the values to a new JSON structure.

Implementation

  1. Create Sample CSV: Create a file named transactions.csv with the following content.

    transactions.csv
    transaction_id,timestamp,amount,currency
    txn-001,2025-10-20T10:00:00Z,150.00,USD
    txn-002,2025-10-20T10:01:00Z,12500.00,EUR
  2. Create the Pipeline: Create a file named csv-splitter.yaml. This pipeline reads the CSV, skips the header, and parses each line.

    csv-splitter.yaml
    name: csv-splitter
    description: Process a CSV file line-by-line.

    config:
    input:
    file:
    paths:
    - ./transactions.csv # Assumes the CSV is in the same directory
    scanner:
    lines: {} # This tells the input to create a message for each line

    pipeline:
    processors:
    # Processor 1: Skip the header row
    - mapping: |
    # If the line starts with "transaction_id", delete the message
    if this.string().has_prefix("transaction_id") {
    root = deleted()
    } else {
    root = this
    }

    # Processor 2: Parse the CSV line into a JSON object
    - mapping: |
    # Split the line string into an array of fields
    let fields = this.string().split(",")

    # Create the new JSON structure
    root.transaction_id = $fields.index(0)
    root.timestamp = $fields.index(1)
    root.amount = $fields.index(2).number() # Convert amount to a number
    root.currency = $fields.index(3)

    output:
    stdout:
    codec: lines
  3. Deploy and Test: The pipeline will run immediately, process the transactions.csv file, and then terminate.

  4. Verify: Check the logs or output of the pipeline. You will see the two distinct JSON messages, one for each data row in the CSV file.

You have now learned the fundamental pattern of processing batch files. This same "File -> Lines -> Structure" pattern can be adapted for any line-delimited format, like plain text logs.