Complete Pipeline: MotherDuck Retail Analytics
The full production-ready pipeline — from POS transactions to query-ready Parquet on S3.
📋
Ready to deploy?
Copy the complete pipeline YAML and paste it into Expanso Cloud.
Full Pipeline YAML
# MotherDuck Retail Analytics Pipeline
# Expanso Edge: POS → Enrich → Validate → Batch → Parquet → S3 → DuckLake
#
# Environment variables:
# S3_BUCKET - Your S3 bucket name
# AWS_REGION - AWS region (default: us-east-1)
name: motherduck-retail-pos
config:
input:
generate:
interval: 100ms
mapping: |
root.txn_id = uuid_v4()
root.store_id = random_int(min: 1, max: 50)
root.terminal_id = random_int(min: 1, max: 10)
root.timestamp = now()
let roll = random_int(min: 1, max: 100)
root.type = match {
$roll <= 85 => "sale",
$roll <= 95 => "return",
_ => "exchange"
}
root.payment_method = ["card", "card", "card", "cash", "mobile", "gift_card"]
.index(random_int(min: 0, max: 5))
root.employee_id = "EMP-" + random_int(min: 1000, max: 9999).string()
let categories = ["grocery", "produce", "dairy", "bakery", "meat",
"frozen", "beverage", "household", "personal_care", "electronics"]
let item_count = random_int(min: 1, max: 6)
root.items = range(0, $item_count).map_each(
{
"sku": "SKU-" + random_int(min: 10000, max: 99999).string(),
"name": "Item " + random_int(min: 1, max: 500).string(),
"category": $categories.index(random_int(min: 0, max: 9)),
"qty": random_int(min: 1, max: 4),
"unit_price": (random_int(min: 99, max: 4999).number() / 100)
}
)
root.subtotal = root.items.map_each(this.qty * this.unit_price).sum().round(2)
root.tax_rate = 0.0875
root.tax_amount = (root.subtotal * root.tax_rate).round(2)
root.total_amount = (root.subtotal + root.tax_amount).round(2)
root = if root.type == "return" {
root.assign({
"subtotal": -root.subtotal,
"tax_amount": -root.tax_amount,
"total_amount": -root.total_amount
})
}
pipeline:
processors:
# 1. Enrich with store metadata
- mapping: |
let stores = {
"1": {"region": "NW", "format": "flagship", "city": "Seattle", "state": "WA", "sqft": 45000},
"2": {"region": "NW", "format": "standard", "city": "Portland", "state": "OR", "sqft": 28000},
"3": {"region": "NW", "format": "express", "city": "Boise", "state": "ID", "sqft": 12000},
"4": {"region": "SW", "format": "flagship", "city": "Los Angeles", "state": "CA", "sqft": 52000},
"5": {"region": "SW", "format": "standard", "city": "Phoenix", "state": "AZ", "sqft": 30000},
"6": {"region": "SW", "format": "outlet", "city": "Las Vegas", "state": "NV", "sqft": 18000},
"7": {"region": "NE", "format": "flagship", "city": "New York", "state": "NY", "sqft": 48000},
"8": {"region": "NE", "format": "standard", "city": "Boston", "state": "MA", "sqft": 26000},
"9": {"region": "SE", "format": "standard", "city": "Atlanta", "state": "GA", "sqft": 32000},
"10": {"region": "SE", "format": "express", "city": "Miami", "state": "FL", "sqft": 14000},
"11": {"region": "MW", "format": "flagship", "city": "Chicago", "state": "IL", "sqft": 42000},
"12": {"region": "MW", "format": "standard", "city": "Minneapolis", "state": "MN", "sqft": 25000}
}
let store_key = this.store_id.string()
let meta = $stores.get($store_key).or({
"region": ["NW","SW","NE","SE","MW"].index(this.store_id % 5),
"format": ["standard","standard","express","standard","outlet"].index(this.store_id % 5),
"city": "Store-" + this.store_id.string(),
"state": "US",
"sqft": 20000 + (this.store_id * 500)
})
root = this
root.store_region = $meta.region
root.store_format = $meta.format
root.store_city = $meta.city
root.store_state = $meta.state
root.store_sqft = $meta.sqft
root.hour_of_day = this.timestamp.ts_format("15").number()
root.day_of_week = this.timestamp.ts_format("Monday")
root.is_weekend = root.day_of_week == "Saturday" || root.day_of_week == "Sunday"
root.basket_size = this.items.length()
root.avg_item_price = if root.basket_size > 0 {
(this.subtotal.abs() / root.basket_size).round(2)
}
# 2. Validate and flag anomalies
- mapping: |
root = this
let flags = []
let flags = if this.items.length() == 0 { $flags.append("EMPTY_BASKET") }
let has_electronics = this.items.map_each(this.category).flatten().any(c -> c == "electronics")
let flags = if this.total_amount.abs() > 500 && !$has_electronics { $flags.append("HIGH_VALUE_NON_ELECTRONICS") }
let flags = if this.type == "sale" && this.total_amount < 0 { $flags.append("NEGATIVE_SALE") }
let flags = if this.type == "return" && this.total_amount > 0 { $flags.append("POSITIVE_RETURN") }
let flags = if this.items.filter(i -> i.qty > 10).length() > 0 { $flags.append("BULK_QUANTITY") }
let flags = if this.employee_id == "" || this.employee_id == null { $flags.append("MISSING_EMPLOYEE") }
root.anomaly_flags = $flags
root.is_anomaly = $flags.length() > 0
root.quality_score = match {
$flags.length() == 0 => "clean",
$flags.length() == 1 => "warning",
_ => "review"
}
# 3. Flatten items for Parquet
- mapping: |
root = this
root.item_count = this.items.length()
root.items_json = this.items.format_json()
root.anomaly_flags = this.anomaly_flags.format_json()
root = root.without("items")
# 4. Batch for optimal Parquet file size
- batch:
count: 1000
period: 10s
# 5. Encode to Parquet
- parquet_encode:
schema:
- name: txn_id
type: UTF8
- name: store_id
type: INT32
- name: terminal_id
type: INT32
- name: timestamp
type: UTF8
- name: type
type: UTF8
- name: payment_method
type: UTF8
- name: employee_id
type: UTF8
- name: items_json
type: UTF8
- name: item_count
type: INT32
- name: subtotal
type: DOUBLE
- name: tax_rate
type: DOUBLE
- name: tax_amount
type: DOUBLE
- name: total_amount
type: DOUBLE
- name: store_region
type: UTF8
- name: store_format
type: UTF8
- name: store_city
type: UTF8
- name: store_state
type: UTF8
- name: store_sqft
type: INT32
- name: hour_of_day
type: INT32
- name: day_of_week
type: UTF8
- name: is_weekend
type: BOOLEAN
- name: basket_size
type: INT32
- name: avg_item_price
type: DOUBLE
- name: anomaly_flags
type: UTF8
- name: is_anomaly
type: BOOLEAN
- name: quality_score
type: UTF8
default_compression: zstd
output:
aws_s3:
bucket: "${S3_BUCKET}"
path: >-
transactions/region=${! json("store_region") }/date=${! now().ts_format("2006-01-02") }/batch_${! timestamp_unix() }_${! count("s3_files") }.parquet
content_type: application/octet-stream
max_in_flight: 4
batching:
count: 1
processors: []
Deploy
# Set environment variables
export S3_BUCKET=my-retail-data
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
export AWS_REGION=us-east-1
# Run the pipeline
expanso-edge run --config motherduck-retail-pos.yaml
Connect MotherDuck
Once data is landing in S3, set up DuckLake:
INSTALL ducklake;
LOAD ducklake;
-- Create catalog
ATTACH 'ducklake:retail_analytics'
(DATA_PATH 's3://my-retail-data/ducklake',
METADATA_PATH 's3://my-retail-data/ducklake_metadata');
-- Import existing Parquet data
USE retail_analytics;
CREATE TABLE transactions AS
SELECT * FROM read_parquet('s3://my-retail-data/transactions/**/*.parquet',
hive_partitioning=true);
-- Query across all stores
SELECT store_region, COUNT(*) as txns, SUM(total_amount) as revenue
FROM transactions
WHERE type = 'sale'
GROUP BY store_region
ORDER BY revenue DESC;
Pipeline Stats
| Metric | Value |
|---|---|
| Input rate | 10 TPS per store (500 TPS total across 50 stores) |
| Daily volume | ~43M transactions |
| Raw JSON size | ~90 GB/day |
| Parquet output | ~3.6 GB/day (25x compression) |
| Batch size | 1000 rows or 10s window |
| File size | ~85 KB per batch |
| Data freshness | ~10 seconds POS → queryable |
| Partitioning | Hive-style by region and date |