Skip to main content

Step 4: Route to SCADA Historian and Alerting

The Goalโ€‹

You have classified fault events. Now send them where they need to go โ€” and only where they need to go:

  • Critical fault events โ†’ SCADA historian (permanent record) + PagerDuty (immediate alert to on-call operator)
  • KPI summaries โ†’ Grafana Cloud (real-time dashboard visible to grid operations center)
  • Full raw archive โ†’ local disk (NERC CIP ยงR1.4 compliance โ€” complete audit trail stays at the substation)

And critically: strip sensitive topology fields before anything crosses the substation boundary.

NERC CIP Compliance by Designโ€‹

NERC CIP ยงR1.4 requires that access to BES Cyber System Information โ€” including network topology, protection relay configurations, and electronic security perimeter maps โ€” be controlled and restricted.

The naive approach sends everything and hopes the historian's access controls are sufficient. Expanso enforces this at the pipeline level: topology fields are deleted before the output stage runs. They never travel on the wire. They cannot be intercepted or accidentally exposed.

# Strip CIP-sensitive fields before any data leaves the substation
- mapping: |
# These fields contain topology data โ€” delete before egress
root = this.without("bus_topology", "relay_config", "esp_network_map", "protection_zone")

# Log that stripping occurred (for audit trail)
root.cip_fields_stripped = true
root.cip_strip_timestamp = now()

Switch Output: Conditional Routingโ€‹

Expanso's switch output evaluates conditions and sends each message to the matching destination. For SCADA, this creates two streams from one pipeline:

output:
switch:
cases:
# Critical faults: historian + immediate alert
- check: this.alert_required == true
output:
broker:
outputs:
- http_client:
url: "${SCADA_HISTORIAN_URL}"
verb: POST
- http_client:
url: "${PAGERDUTY_WEBHOOK_URL}"
verb: POST

# KPI summaries: cloud dashboard
- output:
http_client:
url: "${GRAFANA_CLOUD_URL}"
verb: POST

Implementationโ€‹

Full Routing Pipelineโ€‹

cat > ~/scada-step-4-route.yaml << 'EOF'
# scada-step-4-route.yaml
# Stage 4: Route fault events to historian + alerting; KPIs to cloud

input:
socket:
network: tcp
address: 0.0.0.0:502
codec: lines

pipeline:
processors:
# Stage 1: Parse registers
- mapping: |
let fields = content().string().split(";").fold({}, (acc, item) -> {
let parts = item.split("=")
acc | { parts[0]: parts[1] }
})
let reg = fields.REG.number()
let val = fields.VAL.number()

root.voltage_kv = if reg == 40001 { val / 100.0 } else { deleted() }
root.current_a = if reg == 40003 { val / 10.0 } else { deleted() }
root.frequency_hz = if reg == 40005 { val / 100.0 } else { deleted() }
root.temp_c = if reg == 40007 { val / 10.0 } else { deleted() }
root.power_mw = if reg == 40009 { val / 10.0 } else { deleted() }

root.device_id = fields.DEVICE
root.register = reg
root.raw_value = val
root.status = fields.STATUS.number()
root.substation_id = env("SUBSTATION_ID").or("SUB-CENTRAL-01")
root.region = env("GRID_REGION").or("WECC-SOUTHWEST")
root."@timestamp" = fields.TS.number()

# Stage 2: Filter nominal
- mapping: |
let voltage_ok = !this.voltage_kv.exists() || (this.voltage_kv >= 110.0 && this.voltage_kv <= 145.0)
let frequency_ok = !this.frequency_hz.exists() || (this.frequency_hz >= 59.95 && this.frequency_hz <= 60.05)
let temp_ok = !this.temp_c.exists() || this.temp_c <= 75.0

if voltage_ok && frequency_ok && temp_ok {
root = deleted()
}

# Stage 3: Classify faults
- mapping: |
root.fault_type = match {
this.voltage_kv.exists() && (this.voltage_kv < 110.0 || this.voltage_kv > 145.0) => "VOLTAGE_DEVIATION"
this.frequency_hz.exists() && (this.frequency_hz < 59.95 || this.frequency_hz > 60.05) => "FREQUENCY_DRIFT"
this.temp_c.exists() && this.temp_c > 75.0 => "THERMAL_OVERLOAD"
_ => "NOMINAL"
}
root.severity = if this.fault_type == "NOMINAL" { "info" } else { "critical" }
root.alert_required = this.fault_type != "NOMINAL"
root.processed_at = now()

# Stage 4: Strip NERC CIP-sensitive topology fields before egress
- mapping: |
# Remove fields that contain BES Cyber System Information
# These fields must not cross the electronic security perimeter
root = this.without("bus_topology", "relay_config", "esp_network_map",
"protection_zone", "rtu_ip_address", "dnp3_address")
root.cip_fields_stripped = true

# Route based on alert_required flag
output:
switch:
cases:
# Critical fault: historian + PagerDuty (simultaneous via broker)
- check: this.alert_required == true
output:
broker:
outputs:
# SCADA historian โ€” permanent event record
- http_client:
url: "${SCADA_HISTORIAN_URL}"
verb: POST
headers:
Content-Type: "application/json"
Authorization: "Bearer ${HISTORIAN_API_KEY}"
timeout: 10s
retry_policy:
max_retries: 3
initial_interval: 1s

# PagerDuty โ€” immediate operator alert
- http_client:
url: "${PAGERDUTY_WEBHOOK_URL}"
verb: POST
headers:
Content-Type: "application/json"
timeout: 5s
# PagerDuty expects a specific payload shape
processors:
- mapping: |
root = {
"routing_key": env("PAGERDUTY_ROUTING_KEY"),
"event_action": "trigger",
"payload": {
"summary": this.fault_type + " on " + this.device_id + " at " + this.substation_id,
"severity": this.severity,
"source": this.substation_id,
"timestamp": this.processed_at,
"custom_details": {
"voltage_kv": this.voltage_kv,
"frequency_hz": this.frequency_hz,
"temp_c": this.temp_c,
"fault_type": this.fault_type,
"device_id": this.device_id,
"region": this.region
}
}
}

# KPI summary: Grafana Cloud (CIP-safe aggregated data)
- output:
http_client:
url: "${GRAFANA_CLOUD_URL}"
verb: POST
headers:
Content-Type: "application/json"
Authorization: "Bearer ${GRAFANA_API_KEY}"
timeout: 15s

# Fallback: local audit archive (all events, regardless of routing above)
# This satisfies NERC CIP audit trail requirement
- output:
file:
path: "${LOCAL_AUDIT_PATH}/scada-audit-${!timestamp_unix()}.jsonl"
codec: lines
EOF

Test the Routingโ€‹

# Deploy the routing pipeline
expanso pipeline deploy ~/scada-step-4-route.yaml

# Send a critical fault โ€” should go to historian + PagerDuty
echo "REG=40001;VAL=10450;UNIT=V_x100;TS=$(date +%s);DEVICE=RTU-07A;STATUS=2" | nc localhost 502

# Send a nominal reading that's been pre-classified (for Grafana routing test)
# In production this would be a KPI summary, not a raw reading
echo "REG=40009;VAL=2847;UNIT=MW_x10;TS=$(date +%s);DEVICE=RTU-07A;STATUS=0" | nc localhost 502

Verify Destinations Received Dataโ€‹

# Check historian received the fault event
curl -sf "${SCADA_HISTORIAN_URL}/events?device=RTU-07A&limit=1" \
-H "Authorization: Bearer ${HISTORIAN_API_KEY}" | jq .

# Check local audit archive
ls -la "${LOCAL_AUDIT_PATH}/"
cat "${LOCAL_AUDIT_PATH}/scada-audit-*.jsonl" | jq '{ fault_type, severity, device_id, cip_fields_stripped }' | head -20

Audit Trail Complianceโ€‹

The local file output captures every event โ€” including ones filtered before routing โ€” creating a complete audit trail. This satisfies NERC CIP requirements for event logging without shipping sensitive data to cloud systems.

# Separate audit pipeline running in parallel
# Captures ALL readings before filtering
output:
broker:
outputs:
- file:
path: "/var/lib/expanso/scada-audit/raw-${!timestamp().format('2006-01-02')}.jsonl"
codec: lines
- # ... normal routing continues ...

Summary: Complete Data Flowโ€‹

RTU/PLC โ†’ [Modbus TCP]
โ†’ Expanso Edge (at substation)
โ†’ Parse registers # V, A, Hz, ยฐC, MW
โ†’ Filter nominal # 99%+ dropped here
โ†’ Classify faults # VOLTAGE_DEVIATION, FREQUENCY_DRIFT, etc.
โ†’ Strip CIP fields # topology stays local
โ†’ Route:
alert_required=true โ†’ SCADA Historian + PagerDuty
alert_required=false โ†’ Grafana Cloud
all events โ†’ Local Audit Archive (NERC CIP)

Result: 50,000 reads/minute becomes 12 KPIs/minute + immediate fault alerts.

What's Next?โ€‹

You've built the complete 4-stage pipeline. See the production-ready version with all stages combined:

โ†’ Next Step: Complete SCADA Edge Integration Pipeline