Step 2: Split CSV Batches
This step teaches you how to process a CSV file, turning each row into a separate JSON message for individual processing.
The Goal
You will transform a single CSV file like this:
transaction_id,timestamp,amount,currency
txn-001,2025-10-20T10:00:00Z,150.00,USD
txn-002,2025-10-20T10:01:00Z,12500.00,EUR
Into two separate output messages:
{
"transaction_id": "txn-001",
"timestamp": "2025-10-20T10:00:00Z",
"amount": 150.0,
"currency": "USD"
}
{
"transaction_id": "txn-002",
"timestamp": "2025-10-20T10:01:00Z",
"amount": 12500.0,
"currency": "EUR"
}
The "File -> Lines -> Structure" Pattern
Unlike splitting a JSON array, processing a CSV file involves reading it line by line.
- File Input: Use the
fileinput with alinesscanner to read the file and create a new message for each line. - Skip Header: The first message will be the header row (
transaction_id,timestamp,...). We need to identify and delete this message. - Parse Line: For all other messages (the data rows), split the string by the comma delimiter and map the values to a new JSON structure.
Implementation
-
Create Sample CSV: Create a file named
transactions.csvwith the following content.transactions.csvtransaction_id,timestamp,amount,currency
txn-001,2025-10-20T10:00:00Z,150.00,USD
txn-002,2025-10-20T10:01:00Z,12500.00,EUR -
Create the Pipeline: Create a file named
csv-splitter.yaml. This pipeline reads the CSV, skips the header, and parses each line.csv-splitter.yamlname: csv-splitter
description: Process a CSV file line-by-line.
config:
input:
file:
paths:
- ./transactions.csv # Assumes the CSV is in the same directory
scanner:
lines: {} # This tells the input to create a message for each line
pipeline:
processors:
# Processor 1: Skip the header row
- mapping: |
# If the line starts with "transaction_id", delete the message
if this.string().has_prefix("transaction_id") {
root = deleted()
} else {
root = this
}
# Processor 2: Parse the CSV line into a JSON object
- mapping: |
# Split the line string into an array of fields
let fields = this.string().split(",")
# Create the new JSON structure
root.transaction_id = $fields.index(0)
root.timestamp = $fields.index(1)
root.amount = $fields.index(2).number() # Convert amount to a number
root.currency = $fields.index(3)
output:
stdout:
codec: lines -
Deploy and Test: The pipeline will run immediately, process the
transactions.csvfile, and then terminate. -
Verify: Check the logs or output of the pipeline. You will see the two distinct JSON messages, one for each data row in the CSV file.
You have now learned the fundamental pattern of processing batch files. This same "File -> Lines -> Structure" pattern can be adapted for any line-delimited format, like plain text logs.