Automatic aggregation for high-frequency sensor data. Write raw factory equipment readings at 1-second resolution and let Ferrosa consolidate them into 5-minute summaries.
|
Note
|
Complete the 3-Node Cluster Setup tutorial first, or have a running Ferrosa cluster accessible on localhost:9042.
|
The Problem
A factory monitors vibration and temperature on rotating equipment. Raw readings are useful for diagnosis, but dashboards and alerting usually need the operating envelope and trend for each time window:
-
minshows the quietest point in the window. -
maxcatches excursions and short spikes. -
avgshows the baseline trend. -
stddevshows volatility, which is often the earliest sign of bearing wear or imbalance.
Ferrosa automates those rollups with built-in streaming consolidation.
You define the aggregation rules in the table’s extensions map at CREATE TABLE time, and the storage engine handles the rest.
The Runnable Rollup
| Tier | Table | Resolution | Aggregation |
|---|---|---|---|
0 |
|
1 second (raw) |
None (source) |
1 |
|
5 minutes |
min, max, avg, stddev |
When 5 minutes of raw data accumulates in sensor_readings_1s, the aggregator computes summary statistics and writes them to sensor_readings_5m.
Multi-tier cascade auto-creation is not enabled in this runnable contract.
Correct avg and stddev cascades require carrying rollup state such as count, sum, and sum-of-squares between tiers; creating downstream tables without that state would produce misleading results.
Create the Schema
-- RRD Time-Series Aggregation: Sensor Schema
--
-- Creates a runnable single-step rollup for factory sensor readings:
-- 1s raw readings -> 5m rollups.
-- Min/max/avg/stddev use the built-in streaming consolidation path.
CREATE KEYSPACE IF NOT EXISTS plant
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE plant;
-- Tier 1: materialized 5-minute aggregates.
CREATE TABLE sensor_readings_5m (
sensor_id uuid,
ts timestamp,
vibration_mm_s_min double,
vibration_mm_s_max double,
vibration_mm_s_avg double,
vibration_mm_s_stddev double,
temperature_c_min double,
temperature_c_max double,
temperature_c_avg double,
temperature_c_stddev double,
PRIMARY KEY (sensor_id, ts)
) WITH CLUSTERING ORDER BY (ts DESC);
-- Tier 0: raw 1-second vibration and temperature readings.
CREATE TABLE sensor_readings_1s (
sensor_id uuid,
ts timestamp,
vibration_mm_s double,
temperature_c double,
PRIMARY KEY (sensor_id, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND extensions = {
'consolidation.interval': '5m',
'consolidation.functions': 'min,max,avg,stddev',
'consolidation.target': 'sensor_readings_5m',
'consolidation.late_window': '15m',
'consolidation.columns': 'vibration_mm_s,temperature_c'
};
-- Multi-tier cascade auto-creation is intentionally not used in this runnable
-- contract. It is tracked separately because correct avg/stddev cascades need
-- additional rollup state such as count, sum, and sum-of-squares.
The source table stores vibration_mm_s and temperature_c.
Both columns are rolled up with built-in streaming min/max/avg/stddev:
'consolidation.functions': 'min,max,avg,stddev',
'consolidation.columns': 'vibration_mm_s,temperature_c'
The 5-minute rollup exposes one row per sensor per window with columns such as:
vibration_mm_s_min
vibration_mm_s_max
vibration_mm_s_avg
vibration_mm_s_stddev
temperature_c_min
temperature_c_max
temperature_c_avg
temperature_c_stddev
Insert Sample Data
-- RRD Time-Series Aggregation: Sensor Data
--
-- Insert factory equipment readings. Consolidation triggers automatically
-- when data timestamps cross 5-minute boundaries.
USE plant;
-- Pump A: rising vibration during the first 5-minute window, then recovery.
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:00:01', 4.2, 68.1);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:00:30', 4.4, 68.3);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:01:00', 4.1, 68.0);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:01:30', 4.7, 68.6);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:02:00', 5.0, 69.0);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:02:30', 5.4, 69.5);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:03:00', 5.9, 70.0);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:03:30', 6.2, 70.4);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:04:00', 5.8, 70.1);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:04:30', 5.1, 69.4);
-- Expected first 5-minute rollup for Pump A:
-- vibration_mm_s_min = 4.1
-- vibration_mm_s_max = 6.2
-- vibration_mm_s_avg = 5.08
-- vibration_mm_s_stddev = computed by the built-in streaming stddev
-- temperature_c_min = 68.0
-- temperature_c_max = 70.4
-- temperature_c_avg = 69.14
-- temperature_c_stddev = computed by the built-in streaming stddev
-- Second 5-minute window: the same pump settles after inspection.
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:05:00', 4.8, 69.0);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:06:00', 4.6, 68.7);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:07:00', 4.5, 68.6);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:08:00', 4.3, 68.4);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:09:00', 4.2, 68.2);
-- Pump B: baseline readings for cross-sensor comparisons.
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (7c07fa8d-f630-4a32-a0d8-df48235ed6f2, '2026-05-20 12:00:01', 2.1, 64.2);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (7c07fa8d-f630-4a32-a0d8-df48235ed6f2, '2026-05-20 12:01:00', 2.3, 64.4);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (7c07fa8d-f630-4a32-a0d8-df48235ed6f2, '2026-05-20 12:02:00', 2.2, 64.3);
INSERT INTO sensor_readings_1s (sensor_id, ts, vibration_mm_s, temperature_c)
VALUES (7c07fa8d-f630-4a32-a0d8-df48235ed6f2, '2026-05-20 12:05:00', 2.4, 64.5);
As data timestamps cross 5-minute boundaries, the consolidation observer enqueues rollup work and the background materialization worker writes target rows. No cron jobs, no external ETL.
Query Rollups
Raw readings
-- Raw 1-second data for one pump.
SELECT sensor_id, ts, vibration_mm_s, temperature_c
FROM sensor_readings_1s
WHERE sensor_id = 2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34
LIMIT 10;
5-minute sensor envelope
-- 5-minute aggregates show excursions, trend, and volatility.
SELECT sensor_id, ts,
vibration_mm_s_min,
vibration_mm_s_max,
vibration_mm_s_avg,
vibration_mm_s_stddev,
temperature_c_min,
temperature_c_max,
temperature_c_avg,
temperature_c_stddev
FROM sensor_readings_5m
WHERE sensor_id = 2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34
LIMIT 10;
Shift-range volatility
-- Time-range query across a shift at 5-minute resolution.
SELECT sensor_id, ts, vibration_mm_s_min, vibration_mm_s_max,
vibration_mm_s_avg, vibration_mm_s_stddev
FROM sensor_readings_5m
WHERE sensor_id = 2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34
AND ts >= '2026-05-20 12:00:00'
AND ts < '2026-05-20 20:00:00';
WASM Aggregate Status
The runnable sensor demo uses the built-in streaming stddev.
Ferrosa supports three WASM loading forms for function metadata, and the WASM executor now has a streaming aggregate ABI with init, update(value), and finalize.
WASM aggregate execution for live RRD materialization uses that ABI for each row in the rollup window.
Author the aggregate in Rust
A streaming aggregate keeps bounded state, consumes one value per update, and returns one f64 from finalize.
It declares the ferrosa:streaming-aggregate:v1 marker so the executor recognizes the aggregate ABI rather than treating it as a scalar UDF.
Here is a complete Welford standard-deviation aggregate — no_std, allocation-free, and under 60 lines.
The full crates (this stddev and an rms variant for vibration energy) live at examples/timeseries-rrd/wasm-rust.
#![no_std]
use core::panic::PanicInfo;
#[used]
#[link_section = "ferrosa:streaming-aggregate:v1"]
static FERROSA_STREAMING_AGGREGATE_ABI: [u8; 57] =
*b"ferrosa streaming aggregate abi: init/update/finalize f64";
static mut COUNT: f64 = 0.0;
static mut MEAN: f64 = 0.0;
static mut M2: f64 = 0.0;
#[no_mangle]
pub extern "C" fn init() {
unsafe {
COUNT = 0.0;
MEAN = 0.0;
M2 = 0.0;
}
}
#[no_mangle]
pub extern "C" fn update(value: f64) {
unsafe {
COUNT += 1.0;
let delta = value - MEAN;
MEAN += delta / COUNT;
let delta2 = value - MEAN;
M2 += delta * delta2;
}
}
#[no_mangle]
pub extern "C" fn finalize() -> f64 {
unsafe {
if COUNT <= 0.0 {
0.0
} else {
sqrt(M2 / COUNT)
}
}
}
fn sqrt(value: f64) -> f64 {
if value <= 0.0 {
return 0.0;
}
let mut estimate = if value >= 1.0 { value } else { 1.0 };
for _ in 0..16 {
estimate = 0.5 * (estimate + value / estimate);
}
estimate
}
#[panic_handler]
fn panic(_info: &PanicInfo) -> ! {
loop {}
}
Build it for wasm32-unknown-unknown and package the core module as a Component Model artifact:
rustup target add wasm32-unknown-unknown
cargo build --release --target wasm32-unknown-unknown \
--manifest-path wasm-rust/stddev/Cargo.toml
wasm-tools component new \
wasm-rust/stddev/target/wasm32-unknown-unknown/release/ferrosa_rrd_stddev.wasm \
-o stddev.wasm
Load the compiled artifact
The AS FILE and AS URL … WITH SHA256 forms are admin-only artifact loading paths.
-- streaming stddev function so it runs without an external WASM artifact.
-- Rust-authored examples live under wasm-rust/stddev and wasm-rust/rms.
USE plant;
-- Inline hex loading form. Replace these tiny placeholder bytes with a
-- compiled Component Model module that carries the
-- ferrosa:streaming-aggregate:v1 marker and exports init/update/finalize.
CREATE OR REPLACE FUNCTION plant.stddev(value double)
CALLED ON NULL INPUT
RETURNS double
LANGUAGE wasm
AS '0x0061736d0d000100';
-- Admin-only file loading form for pre-approved local artifacts.
CREATE OR REPLACE FUNCTION plant.stddev(value double)
CALLED ON NULL INPUT
RETURNS double
LANGUAGE wasm
AS FILE '/secure/udf/stddev.wasm';
-- Admin-only URL loading form. Ferrosa verifies the fetched bytes against the
-- required SHA-256 digest before storing the function metadata.
The accepted future table syntax references the function with wasm:keyspace.function_name:
CALLED ON NULL INPUT
RETURNS double
LANGUAGE wasm
AS URL 'https://artifacts.example/ferrosa/stddev.wasm'
WITH SHA256 = '3dbe4f6fb3f44dfb11ad93f6c07f81a4b34d217f8dcf7dd06d80670d88e6b7c1';
-- RRD table syntax references the UDF through the wasm:keyspace.function_name
-- consolidation function syntax. The RRD worker streams window rows through
-- init/update/finalize.
CREATE TABLE vibration_with_wasm_stddev (
sensor_id uuid,
ts timestamp,
vibration_mm_s double,
PRIMARY KEY (sensor_id, ts)
Use wasm: functions only for components that declare the ferrosa:streaming-aggregate:v1 marker and export init, update, and finalize.
Scalar WASM UDFs are rejected for aggregate execution rather than being called with a materialized list.
The URL form requires a SHA-256 digest so artifact bytes cannot change between review and DDL apply.
Built-in Functions
Ferrosa provides built-in consolidation functions for common rollups.
See built-in-functions.cql for a complete example of each.
| Function | Description |
|---|---|
|
Lowest value in the window |
|
Highest value in the window |
|
Arithmetic mean |
|
Population standard deviation using the built-in implementation |
|
Number of data points |
|
Total of all values |
|
Middle value (requires sort, O(n log n)) |
WASM aggregate execution is documented separately because it is not part of the runnable materialization path yet.
Multi-Column Composite
When a table has multiple numeric columns, all configured functions are applied to each column independently.
The sensor schema applies the same min/max/avg/stddev set to vibration and temperature.
See composite.cql for an additional weather-station style example.
Late-Arriving Data
Real-world data streams are never perfectly ordered. Ferrosa handles late-arriving data with automatic re-aggregation:
VALUES (2f4d6a9e-6a9a-4f75-a6c4-cd8d210e7e34, '2026-05-20 12:01:30', 4.9, 68.8);
When a data point falls into an already-consolidated window:
-
The aggregator detects that the timestamp is before the current boundary.
-
A
LateDatatask is sent to the async worker. -
The worker streams the affected source window from storage.
-
The corrected aggregate is upserted into the downstream table.
-
The corrected aggregate is visible in the target table.
Late data is only accepted within the late_window.
The disk-backed keyed cursor streams source rows by partition and window for recomputation when a window has fallen out of the in-memory ring.
Observability
Monitor consolidation health, delayed materialization, and runtime ring-memory controls:
SELECT * FROM system_observability.consolidation_status;
SELECT keyspace_name, table_name, queue_depth, oldest_task_age_ms, alerting
FROM system_observability.materialization_queues
WHERE alerting = true;
SELECT setting_name, setting_value, source
FROM system_observability.rrd_runtime_settings;
-- Admin-only runtime tuning. The default ring memory budget is derived from
-- detected memory and can be overridden at process start with
-- FERROSA_RRD_RING_MEMORY_BUDGET_BYTES.
UPDATE system_observability.rrd_runtime_settings
SET setting_value = 134217728
WHERE setting_name = 'ring_memory_budget_bytes';
UPDATE system_observability.rrd_runtime_settings
SET setting_value = 50
WHERE setting_name = 'ring_thrash_warn_evictions';
consolidation_status shows per-table rollup counters.
materialization_queues highlights tables where async rollup work is falling behind.
rrd_runtime_settings lets an administrator inspect and tune ring memory pressure without restarting the node.
The default ring memory budget is derived from the host or container memory limit and can be overridden at startup with FERROSA_RRD_RING_MEMORY_BUDGET_BYTES.
Files in This Example
| File | Purpose |
|---|---|
|
Keyspace, target rollup table, and raw sensor table with built-in streaming consolidation extensions |
|
Sample vibration and temperature data for two pumps across two 5-minute windows |
|
Queries against raw data, rollup tiers, and operational observability |
|
WASM stddev loading forms and streaming aggregate registration |
|
Rust-authored Welford stddev aggregate with bounded streaming state |
|
Rust-authored RMS aggregate for vibration energy rollups |
|
Demonstrates each built-in function individually |
|
Multi-column composite aggregation |
|
Median-specific example with latency monitoring |
|
Late-arriving data and re-aggregation |
|
Static contract test for the sensor example and WASM loading snippets |