Complete O-RAN Pipeline Configuration
The complete, production-ready Expanso Edge pipeline configuration that integrates all components: data collection, transformation, and multi-destination routing.
📋
Ready to deploy?
Copy the complete pipeline YAML and paste it into Expanso Cloud.
Full Pipeline YAML
# Complete O-RAN Edge Telemetry Pipeline
# Production-ready configuration for OpenShift SNO deployment
name: oran-edge-telemetry-complete
version: "1.2.0"
# Global configuration
config:
# Environment-specific settings
environment:
node_id: "${NODE_ID:edge-node-001}"
region: "${REGION:us-west-2}"
market: "${MARKET:seattle}"
site_type: "${SITE_TYPE:urban-macro}"
# Logging configuration
logger:
level: INFO
format: json
add_timestamp: true
# Metrics collection
metrics:
prometheus:
enabled: true
path: "/metrics"
port: 9090
# HTTP server for health checks
http:
enabled: true
address: "0.0.0.0:8080"
root_path: "/health"
debug_endpoints: false
# Input configuration - hybrid collection approach
input:
broker:
inputs:
# Real-time API collection from DU nodes
- label: "du_api_realtime"
http_client:
url: "${DU_ENDPOINT:http://du-001.oran.local:8080}/api/v1/metrics/realtime"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
Accept: "application/json"
User-Agent: "expanso-edge-oran-collector/1.2.0"
rate_limit: "1/s"
timeout: "30s"
# Retry configuration
retry_until_success: false
max_retries: 3
backoff:
initial_interval: "1s"
max_interval: "30s"
processors:
- mapping: |
# Parse API response and add metadata
root = this
root.collection_method = "api_realtime"
root.collector_timestamp = now()
root.du_id = env("DU_ID") | this.du_id | "du-unknown"
root.cell_id = env("CELL_ID") | this.cell_id | "cell-unknown"
root.gnb_id = env("GNB_ID") | this.gnb_id | "gnb-unknown"
# File-based collection for batch/historical data
- label: "du_files_batch"
file:
paths:
- "/mnt/du-telemetry/ptp4l/*.log"
- "/mnt/du-telemetry/scheduler/*.log"
- "/mnt/du-telemetry/system/*.log"
- "/mnt/du-telemetry/rf/*.log"
scanner:
lines: {}
processors:
- mapping: |
# Parse log files based on source
root.collection_method = "file_batch"
root.collector_timestamp = now()
root.file_path = meta("path")
root.du_id = env("DU_ID") | "du-unknown"
root.cell_id = env("CELL_ID") | "cell-unknown"
root.gnb_id = env("GNB_ID") | "gnb-unknown"
# Parse based on log file type
let log_type = if this.file_path.contains("ptp4l") {
"ptp_timing"
} else if this.file_path.contains("scheduler") {
"prb_utilization"
} else if this.file_path.contains("system") {
"system_performance"
} else if this.file_path.contains("rf") {
"rf_measurements"
} else {
"unknown"
}
root.metric_type = log_type
root.raw_line = this.string()
# Parse common patterns
match log_type {
"ptp_timing" => {
let offset_match = this.re_find_all("offset: ([+-]?\\d+)ns")
root.ptp4l_offset_ns = if offset_match.length() > 0 {
offset_match.0.1.number()
} else { null }
}
"prb_utilization" => {
let dl_match = this.re_find_all("DL_PRB: (\\d+)%")
let ul_match = this.re_find_all("UL_PRB: (\\d+)%")
root.prb_dl_pct = if dl_match.length() > 0 { dl_match.0.1.number() } else { null }
root.prb_ul_pct = if ul_match.length() > 0 { ul_match.0.1.number() } else { null }
}
"system_performance" => {
let cpu_match = this.re_find_all("CPU: ([\\d.]+)%")
let mem_match = this.re_find_all("MEM: ([\\d.]+)%")
root.cpu_pct = if cpu_match.length() > 0 { cpu_match.0.1.number() } else { null }
root.memory_pct = if mem_match.length() > 0 { mem_match.0.1.number() } else { null }
}
"rf_measurements" => {
let rsrp_match = this.re_find_all("RSRP: ([+-]?\\d+)dBm")
let sinr_match = this.re_find_all("SINR: ([+-]?\\d+)dB")
root.rsrp_dbm = if rsrp_match.length() > 0 { rsrp_match.0.1.number() } else { null }
root.sinr_db = if sinr_match.length() > 0 { sinr_match.0.1.number() } else { null }
}
_ => {}
}
# Simulated data generator for testing/demo
- label: "du_simulator"
generate:
interval: "${SIMULATION_INTERVAL:10s}"
count: 0 # Infinite generation
mapping: |
# Generate realistic O-RAN telemetry for demo/testing
root.timestamp = now()
root.collection_method = "simulation"
root.du_id = "du-" + random_int(min:1, max:8).string().pad_left(3, "0")
root.cell_id = "cell-" + random_int(min:1, max:3).string()
root.gnb_id = env("GNB_ID") | "gnb-seattle-01"
# PTP timing with realistic distribution
let base_offset = random_int(max: 200) - 100 # ±100ns baseline
let jitter = random_int(max: 20) - 10 # ±10ns jitter
root.ptp4l_offset_ns = base_offset + jitter
# PRB utilization with diurnal pattern
let hour = now().ts_format("15").number()
let business_hour_factor = if hour >= 8 && hour <= 18 { 1.5 } else { 0.7 }
root.prb_dl_pct = (random_int(min: 20, max: 70) * business_hour_factor).round().min(95)
root.prb_ul_pct = (random_int(min: 10, max: 40) * business_hour_factor).round().min(85)
# System performance
root.cpu_pct = random_int(min: 25, max: 85)
root.memory_pct = random_int(min: 40, max: 80)
root.disk_pct = random_int(min: 15, max: 60)
# RF measurements with path loss variation
root.rsrp_dbm = -(random_int(min: 70, max: 110))
root.sinr_db = random_int(min: -2, max: 25)
# Additional simulated metrics
root.active_ue_count = random_int(min: 5, max: 150)
root.network_rx_mbps = random_int(min: 50, max: 800)
root.network_tx_mbps = random_int(min: 20, max: 400)
# Processing pipeline - transformation and enrichment
pipeline:
processors:
# Input validation and cleanup
- label: "input_validation"
mapping: |
# Ensure all messages have required fields
root = if [
this.du_id != null && this.du_id != "",
this.cell_id != null && this.cell_id != "",
this.timestamp != null
].all() {
this
} else {
deleted() # Drop invalid records
}
# Timestamp normalization
- label: "timestamp_processing"
mapping: |
root = this
# Convert timestamps to standardized format
root.timestamp_ns = match this.timestamp.type() {
"string" => this.timestamp.ts_parse("2006-01-02T15:04:05Z").ts_unix_nano()
"number" => this.timestamp * 1000000000 # Assume seconds, convert to nanoseconds
_ => now().ts_unix_nano()
}
root.timestamp_iso = timestamp_ns().ts_format("2006-01-02T15:04:05.000000000Z")
root.processed_at = now()
# Time-based derived fields
root.hour_of_day = timestamp_ns().ts_format("15").number()
root.day_of_week = timestamp_ns().ts_format("Monday")
root.is_business_hours = root.hour_of_day >= 8 && root.hour_of_day <= 18
# Site metadata enrichment
- label: "site_enrichment"
mapping: |
root = this
# Site hierarchy and metadata
root.region = env("REGION") | "us-west-2"
root.market = env("MARKET") | "seattle"
root.site_type = env("SITE_TYPE") | "urban-macro"
# Generate/validate site identifiers
root.gnb_id = if this.gnb_id == "gnb-unknown" || this.gnb_id == null {
"gnb-" + this.region + "-" + this.du_id.string().pad_left(3, "0")
} else {
this.gnb_id
}
# Cell sector mapping
root.sector = match this.cell_id {
"cell-1" => "alpha" # 0-120 degrees
"cell-2" => "beta" # 120-240 degrees
"cell-3" => "gamma" # 240-360 degrees
_ => "omni"
}
# Site coordinates (would typically come from CMDB)
root.coordinates = {
"latitude": 47.6062,
"longitude": -122.3321,
"altitude_m": 15
}
# Network hierarchy for analytics
root.network_hierarchy = {
"region": this.region,
"market": this.market,
"site_id": this.gnb_id,
"du_id": this.du_id,
"cell_id": this.cell_id,
"sector": this.sector
}
# PTP compliance analysis
- label: "ptp_analysis"
mapping: |
root = this
# Skip if no PTP data
root = if this.ptp4l_offset_ns == null {
this
} else {
# Calculate absolute offset for thresholds
let abs_offset = if this.ptp4l_offset_ns < 0 {
-this.ptp4l_offset_ns
} else {
this.ptp4l_offset_ns
}
# 5G timing compliance classification (ITU-T G.8271.1)
root.ptp_compliance = match {
$abs_offset < 100 => "compliant" # ±100ns - fully compliant
$abs_offset < 1000 => "degraded" # ±100ns-1μs - degraded
$abs_offset < 10000 => "critical" # ±1-10μs - critical
_ => "non_compliant" # >±10μs - non-compliant
}
# Compliance score (0-100)
root.ptp_score = match {
$abs_offset < 100 => 100
$abs_offset < 1000 => (100 - (($abs_offset - 100) / 9)).round()
$abs_offset < 10000 => (90 - (($abs_offset - 1000) / 100)).round().max(0)
_ => 0
}
# Alert level for downstream systems
root.ptp_alert_level = match this.ptp_compliance {
"compliant" => "none"
"degraded" => "warning"
"critical" => "major"
"non_compliant" => "critical"
}
this
}
# PRB and capacity analysis
- label: "capacity_analysis"
mapping: |
root = this
# Skip if no PRB data
root = if this.prb_dl_pct == null || this.prb_ul_pct == null {
this
} else {
# Resource utilization calculations
root.prb_total_utilization = (this.prb_dl_pct + this.prb_ul_pct) / 2
root.prb_efficiency = root.prb_total_utilization.round()
# Congestion classification
root.congestion_level = match {
this.prb_dl_pct < 50 => "low"
this.prb_dl_pct < 80 => "medium"
this.prb_dl_pct < 95 => "high"
_ => "critical"
}
# Capacity headroom
root.capacity_headroom_pct = 100 - this.prb_dl_pct
# Spectral efficiency estimation (bits/Hz)
root.spectral_efficiency_dl = match {
this.prb_dl_pct < 30 => 2.4 # Low load - high efficiency
this.prb_dl_pct < 70 => 3.2 # Medium load - optimal
this.prb_dl_pct < 90 => 2.8 # High load - decreasing
_ => 1.8 # Congestion - poor
}
# Predictive analytics (simple linear projection)
root.time_to_congestion_min = if this.prb_dl_pct > 70 {
((95 - this.prb_dl_pct) / 0.5).round() # 0.5% growth/minute assumption
} else {
null
}
this
}
# RF quality assessment
- label: "rf_analysis"
mapping: |
root = this
# Skip if no RF data
root = if this.rsrp_dbm == null || this.sinr_db == null {
this
} else {
# RSRP quality classification (3GPP TS 36.133)
root.rsrp_quality = match {
this.rsrp_dbm > -80 => "excellent" # >-80 dBm
this.rsrp_dbm > -90 => "good" # -80 to -90 dBm
this.rsrp_dbm > -100 => "fair" # -90 to -100 dBm
this.rsrp_dbm > -110 => "poor" # -100 to -110 dBm
_ => "very_poor" # <-110 dBm
}
# SINR quality classification
root.sinr_quality = match {
this.sinr_db > 20 => "excellent" # >20 dB
this.sinr_db > 13 => "good" # 13-20 dB
this.sinr_db > 0 => "fair" # 0-13 dB
this.sinr_db > -6 => "poor" # -6-0 dB
_ => "very_poor" # <-6 dB
}
# Composite RF health score (0-100)
let rsrp_score = match {
this.rsrp_dbm > -80 => 100
this.rsrp_dbm > -90 => 80
this.rsrp_dbm > -100 => 60
this.rsrp_dbm > -110 => 40
_ => 20
}
let sinr_score = match {
this.sinr_db > 20 => 100
this.sinr_db > 13 => 80
this.sinr_db > 0 => 60
this.sinr_db > -6 => 40
_ => 20
}
root.rf_health_score = ((rsrp_score + sinr_score) / 2).round()
# Estimated coverage radius (rough approximation)
root.estimated_coverage_radius_m = match {
this.rsrp_dbm > -80 => 500
this.rsrp_dbm > -90 => 1000
this.rsrp_dbm > -100 => 2000
this.rsrp_dbm > -110 => 3000
_ => 5000
}
this
}
# Final data quality and validation
- label: "data_quality"
mapping: |
root = this
# Data quality assessment
root.data_quality = {
"ptp_valid": this.ptp4l_offset_ns != null,
"prb_valid": this.prb_dl_pct != null && this.prb_ul_pct != null,
"rf_valid": this.rsrp_dbm != null && this.sinr_db != null,
"system_valid": this.cpu_pct != null,
"timestamp_valid": this.timestamp_ns != null
}
# Overall quality score
let quality_fields = [
this.data_quality.ptp_valid,
this.data_quality.prb_valid,
this.data_quality.rf_valid,
this.data_quality.system_valid,
this.data_quality.timestamp_valid
]
root.data_quality_score = (quality_fields.filter(v -> v == true).length() / quality_fields.length()) * 100
# Pipeline metadata
root.pipeline_metadata = {
"pipeline_id": "oran-edge-telemetry-complete",
"pipeline_version": "1.2.0",
"processing_node": env("NODE_ID"),
"processing_timestamp": now().ts_format("2006-01-02T15:04:05Z"),
"data_quality_score": root.data_quality_score
}
# Metrics collection for pipeline monitoring
- label: "pipeline_metrics"
metric:
type: counter
name: "oran_telemetry_messages_processed"
labels:
pipeline_id: "oran-edge-telemetry"
du_id: "${!this.du_id}"
region: "${!this.region}"
collection_method: "${!this.collection_method}"
- metric:
type: histogram
name: "oran_telemetry_processing_duration_ms"
buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]
value: "${!this.processed_at.ts_unix_milli() - this.timestamp.ts_unix_milli()}"
- metric:
type: gauge
name: "oran_telemetry_data_quality_score"
value: "${!this.data_quality_score}"
labels:
du_id: "${!this.du_id}"
# Multi-destination output routing
output:
broker:
pattern: fan_out # Send to all destinations simultaneously
outputs:
# 1. Real-time observability - Grafana via OTEL
- label: "grafana_otel"
processors:
- metric:
type: counter
name: "oran_output_attempts"
labels:
destination: "grafana"
- mapping: |
# Transform to OTLP format for Grafana
root = {
"resourceMetrics": [{
"resource": {
"attributes": [
{"key": "du_id", "value": {"stringValue": this.du_id}},
{"key": "cell_id", "value": {"stringValue": this.cell_id}},
{"key": "gnb_id", "value": {"stringValue": this.gnb_id}},
{"key": "region", "value": {"stringValue": this.region}},
{"key": "ptp_compliance", "value": {"stringValue": this.ptp_compliance}},
{"key": "congestion_level", "value": {"stringValue": this.congestion_level}}
]
},
"scopeMetrics": [{
"scope": {"name": "oran.telemetry", "version": "1.2.0"},
"metrics": [
{
"name": "oran_ptp_offset_nanoseconds",
"description": "PTP timing offset from master clock",
"unit": "ns",
"gauge": {
"dataPoints": [{
"timeUnixNano": this.timestamp_ns.string(),
"asInt": this.ptp4l_offset_ns
}]
}
},
{
"name": "oran_ptp_compliance_score",
"description": "PTP compliance score (0-100)",
"unit": "1",
"gauge": {
"dataPoints": [{
"timeUnixNano": this.timestamp_ns.string(),
"asDouble": this.ptp_score
}]
}
},
{
"name": "oran_prb_utilization_percent",
"description": "PRB utilization percentage",
"unit": "%",
"gauge": {
"dataPoints": [
{
"timeUnixNano": this.timestamp_ns.string(),
"asDouble": this.prb_dl_pct,
"attributes": [{"key": "direction", "value": {"stringValue": "downlink"}}]
},
{
"timeUnixNano": this.timestamp_ns.string(),
"asDouble": this.prb_ul_pct,
"attributes": [{"key": "direction", "value": {"stringValue": "uplink"}}]
}
]
}
},
{
"name": "oran_cpu_utilization_percent",
"description": "DU CPU utilization",
"unit": "%",
"gauge": {
"dataPoints": [{
"timeUnixNano": this.timestamp_ns.string(),
"asDouble": this.cpu_pct
}]
}
},
{
"name": "oran_rf_health_score",
"description": "RF quality health score",
"unit": "1",
"gauge": {
"dataPoints": [{
"timeUnixNano": this.timestamp_ns.string(),
"asDouble": this.rf_health_score
}]
}
}
]
}]
}]
}
try:
- http_client:
url: "${OTEL_ENDPOINT:http://otel-collector:4318}/v1/metrics"
verb: POST
headers:
Content-Type: "application/json"
Authorization: "Bearer ${OTEL_TOKEN}"
timeout: "30s"
retry_until_success: false
max_retries: 3
backoff:
initial_interval: "1s"
max_interval: "30s"
catch:
- file:
path: "/data/errors/grafana-${!now().ts_format(\"2006-01-02\")}.jsonl"
codec: lines
processors:
- mapping: |
root = {
"error_timestamp": now(),
"error_destination": "grafana_otel",
"error_message": error(),
"original_data": this
}
# 2. Long-term storage - Parquet files
- label: "parquet_storage"
processors:
- mapping: |
# Optimize for columnar storage
root = {
"timestamp_ns": this.timestamp_ns,
"date": this.timestamp_ns.ts_format("2006-01-02"),
"hour": this.timestamp_ns.ts_format("15").number(),
"du_id": this.du_id,
"cell_id": this.cell_id,
"gnb_id": this.gnb_id,
"region": this.region,
"sector": this.sector,
"ptp_offset_ns": this.ptp4l_offset_ns,
"ptp_compliance": this.ptp_compliance,
"ptp_score": this.ptp_score,
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"prb_efficiency": this.prb_efficiency,
"cpu_pct": this.cpu_pct,
"memory_pct": this.memory_pct,
"rsrp_dbm": this.rsrp_dbm,
"sinr_db": this.sinr_db,
"rf_health_score": this.rf_health_score,
"data_quality_score": this.data_quality_score
}
file:
path: "/data/oran-telemetry/${!this.date}/region=${!this.region}/gnb=${!this.gnb_id}/du-metrics-${!uuid_v4().string().slice(0,8)}.jsonl"
codec: lines
batching:
count: 10000
period: "5m"
byte_size: 10MB
# 3. Analytics platform - Cloudera via Kafka
- label: "cloudera_kafka"
processors:
- mapping: |
# Cloudera-optimized format
root = {
"schema": {"type": "oran_telemetry", "version": "1.2.0"},
"metadata": {
"event_time": this.timestamp_iso,
"ingestion_time": now().ts_format("2006-01-02T15:04:05Z"),
"pipeline_version": "1.2.0",
"data_quality_score": this.data_quality_score
},
"network": this.network_hierarchy,
"telemetry": {
"timing": {
"ptp_offset_ns": this.ptp4l_offset_ns,
"compliance": this.ptp_compliance,
"score": this.ptp_score
},
"resources": {
"prb_dl_percent": this.prb_dl_pct,
"prb_ul_percent": this.prb_ul_pct,
"efficiency": this.prb_efficiency,
"congestion": this.congestion_level
},
"system": {
"cpu_percent": this.cpu_pct,
"memory_percent": this.memory_pct
},
"radio": {
"rsrp_dbm": this.rsrp_dbm,
"sinr_db": this.sinr_db,
"health_score": this.rf_health_score
}
}
}
kafka:
addresses: ["${KAFKA_BOOTSTRAP_SERVERS}"]
topic: "oran-telemetry-structured"
key: "${!this.gnb_id}-${!this.cell_id}"
partition: "${!this.gnb_id.hash(\"xxhash\") % 12}"
client_id: "expanso-oran-pipeline"
sasl:
mechanism: "SCRAM-SHA-512"
user: "${KAFKA_USERNAME}"
password: "${KAFKA_PASSWORD}"
tls:
enabled: true
producer:
max_message_bytes: 10485760
compression: "lz4"
idempotent: true
acks: "all"
# 4. Edge resilience buffer
- label: "local_buffer"
processors:
- mapping: |
root = this
root.buffer_metadata = {
"buffered_at": now(),
"buffer_node": env("NODE_ID"),
"buffer_reason": "edge_resilience"
}
file:
path: "/data/buffer/oran-${!count(\"buffer\")}.jsonl"
codec: lines
batching:
count: 5000
period: "10m"
# Resource limits and deployment configuration
resources:
limits:
memory: "2Gi"
cpu: "1000m"
requests:
memory: "1Gi"
cpu: "500m"
# Environment variable references
env:
- NODE_ID
- REGION
- MARKET
- SITE_TYPE
- DU_ENDPOINT
- DU_API_KEY
- OTEL_ENDPOINT
- OTEL_TOKEN
- KAFKA_BOOTSTRAP_SERVERS
- KAFKA_USERNAME
- KAFKA_PASSWORD
# Health check configuration
health_check:
enabled: true
interval: "30s"
timeout: "5s"
path: "/health"
port: 8080
Kubernetes Deployment
Complete Kubernetes deployment configuration:
# deployment/oran-pipeline-deployment.yaml
apiVersion: v1
kind: Namespace
metadata:
name: expanso-oran
---
apiVersion: v1
kind: ConfigMap
metadata:
name: oran-pipeline-config
namespace: expanso-oran
data:
pipeline.yaml: |
# Include the complete pipeline configuration above
---
apiVersion: v1
kind: Secret
metadata:
name: oran-credentials
namespace: expanso-oran
type: Opaque
data:
du-api-key: <base64-encoded-api-key>
otel-token: <base64-encoded-token>
kafka-username: <base64-encoded-username>
kafka-password: <base64-encoded-password>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: oran-telemetry-pipeline
namespace: expanso-oran
labels:
app: oran-telemetry
version: v1.2.0
spec:
replicas: 1 # Single instance per edge node
selector:
matchLabels:
app: oran-telemetry
template:
metadata:
labels:
app: oran-telemetry
version: v1.2.0
spec:
containers:
- name: expanso-edge
image: expanso/edge:v2.1.0
ports:
- containerPort: 8080
name: health-http
- containerPort: 9090
name: metrics
env:
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: REGION
value: "us-west-2"
- name: MARKET
value: "seattle"
- name: DU_API_KEY
valueFrom:
secretKeyRef:
name: oran-credentials
key: du-api-key
- name: OTEL_TOKEN
valueFrom:
secretKeyRef:
name: oran-credentials
key: otel-token
- name: KAFKA_USERNAME
valueFrom:
secretKeyRef:
name: oran-credentials
key: kafka-username
- name: KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: oran-credentials
key: kafka-password
volumeMounts:
- name: pipeline-config
mountPath: /etc/expanso
- name: data-storage
mountPath: /data
- name: du-telemetry
mountPath: /mnt/du-telemetry
resources:
limits:
memory: "2Gi"
cpu: "1000m"
requests:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
volumes:
- name: pipeline-config
configMap:
name: oran-pipeline-config
- name: data-storage
persistentVolumeClaim:
claimName: oran-data-pvc
- name: du-telemetry
hostPath:
path: /opt/du/telemetry
type: Directory
---
apiVersion: v1
kind: Service
metadata:
name: oran-telemetry-service
namespace: expanso-oran
labels:
app: oran-telemetry
spec:
selector:
app: oran-telemetry
ports:
- port: 8080
targetPort: 8080
name: health
- port: 9090
targetPort: 9090
name: metrics
type: ClusterIP
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: oran-data-pvc
namespace: expanso-oran
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 500Gi
storageClassName: local-storage
Monitoring and Alerting
Service monitor for Prometheus:
# monitoring/service-monitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: oran-telemetry-metrics
namespace: expanso-oran
spec:
selector:
matchLabels:
app: oran-telemetry
endpoints:
- port: metrics
interval: 30s
path: /metrics
---
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: oran-pipeline-alerts
namespace: expanso-oran
spec:
groups:
- name: oran_pipeline_health
rules:
- alert: PipelineDown
expr: up{job="oran-telemetry-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "O-RAN telemetry pipeline is down"
- alert: LowDataQuality
expr: oran_telemetry_data_quality_score < 80
for: 5m
labels:
severity: warning
annotations:
summary: "O-RAN data quality degraded"
This complete configuration provides a production-ready O-RAN telemetry pipeline that can be deployed on OpenShift SNO nodes alongside RAN workloads.