Dear OpenEMS Community,
I am currently implementing TimescaleDB as a Backend-Timedata alternative to InfluxDB:
My development focus is mainly driven by the requirements I have for FEMS (FENECON Energy Management System), but I would love to hear your thoughts about additional requirements or experiences you made with TimescaleDB or PostgreSQL. Also if there is already any development happening out there, I would love to join forces.
The trouble I found with InfluxDB:
- Clustering/High Availability is not available as open source.
- Migration from version 1 to version 2 is not straight forward
- Aggregation/Retention does not work well with backfilled data (which is a frequent use-case in IoT applications)
According to my research, TimescaleDB looks like a very promising alternative. Being based on PostgreSQL, a technology that is already used in OpenEMS Backend for the Odoo-Metadata implementation.
Currently planned database schema: (click to uncollapse details)
Simple tables for `Edge`, `Component` and `Channel` allows addressing a Channel of an OpenEMS Component for a specific OpenEMS Edge
CREATE TABLE "edge" (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
UNIQUE(name)
);
CREATE TABLE "component" (
id SERIAL PRIMARY KEY,
edge_id INTEGER NOT NULL REFERENCES edge,
name TEXT NOT NULL,
UNIQUE(edge_id, name)
);
CREATE TABLE "channel" (
id SERIAL PRIMARY KEY,
component_id INTEGER NOT NULL REFERENCES component,
name TEXT NOT NULL,
UNIQUE(component_id, name)
);
Example
# SELECT * FROM edge;
id | name
----+---------
1 | edge0
2 | edge1
# SELECT * FROM component;
id | edge_id | name
----+---------+------------------------------
1 | 1 | _sum
2 | 1 | ess0
3 | 2 | _sum
# SELECT * FROM channel;
id | component_id | name
-----+--------------+--------------------------------------------
1 | 1 | EssSoc
2 | 1 | Soc
PLSQL functions to generate/query entries for `Edge`, `Component` and `Channel`
CREATE OR REPLACE FUNCTION openems_get_or_create_channel_id(_edge text, _component text, _channel text, OUT _channel_id int)
LANGUAGE plpgsql AS
$func$
BEGIN
LOOP
SELECT channel.id
FROM edge
LEFT JOIN component
ON edge.id = component.edge_id
LEFT JOIN channel
ON component.id = channel.component_id
WHERE edge.name = _edge AND component.name = _component AND channel.name = _channel
INTO _channel_id;
EXIT WHEN FOUND;
INSERT INTO channel (component_id, name)
VALUES ((SELECT _component_id FROM openems_get_or_create_component_id(_edge, _component)), _channel)
ON CONFLICT DO NOTHING
RETURNING id
INTO _channel_id;
EXIT WHEN FOUND;
END LOOP;
END
$func$;
CREATE OR REPLACE FUNCTION openems_get_or_create_component_id(_edge text, _component text, OUT _component_id int)
LANGUAGE plpgsql AS
$func$
BEGIN
LOOP
SELECT component.id
FROM edge
LEFT JOIN component
ON edge.id = component.edge_id
WHERE edge.name = _edge AND component.name = _component
INTO _component_id;
EXIT WHEN FOUND;
INSERT INTO component (edge_id, name)
VALUES ((SELECT _edge_id FROM openems_get_or_create_edge_id(_edge)), _component)
ON CONFLICT DO NOTHING
RETURNING id
INTO _component_id;
EXIT WHEN FOUND;
END LOOP;
END
$func$;
CREATE OR REPLACE FUNCTION openems_get_or_create_edge_id(_edge text, OUT _edge_id int)
LANGUAGE plpgsql AS
$func$
BEGIN
LOOP
SELECT id
FROM edge
WHERE edge.name = _edge
INTO _edge_id;
EXIT WHEN FOUND;
INSERT INTO edge (name)
VALUES (_edge)
ON CONFLICT DO NOTHING
RETURNING id
INTO _edge_id;
EXIT WHEN FOUND;
END LOOP;
END
$func$;
A `data_integer raw` table to hold the actual channel values per Time and Channel. Retention is planned to be set to 30 days.
CREATE TABLE IF NOT EXISTS "data_integer_raw" (time TIMESTAMPTZ NOT NULL, channel_id INTEGER NOT NULL, value NUMERIC NULL);
SELECT create_hypertable('data_integer_raw', 'time');
CREATE INDEX ix_channel_time ON data_integer_raw (time, channel_id DESC);
// TODO Code for Retention
Example
time | channel_id | value
------------------------+------------+----------
2022-05-25 14:39:50+02 | 1 | 10
2022-05-25 14:39:51+02 | 1 | 11
2022-05-25 14:39:52+02 | 1 | 12
2022-05-25 14:39:53+02 | 1 | 12
A `data_integer_5m` materialized view, that holds continuously aggregated 5 minutes values - being the minimum time span queried by OpenEMS UI
CREATE MATERIALIZED VIEW "data_integer_5m" (time, channel_id, "min", "avg", "max")
WITH (timescaledb.continuous) AS
SELECT time_bucket('5 minutes', time) AS time, channel_id, min("value"), avg("value"), max("value")
FROM data_integer_raw
GROUP BY (1,2);
Example
time | channel_id | min | avg | max
------------------------+------------+----------+-------------------------+----------
2022-05-25 14:35:00+02 | 1 | 10 | 11.20000000000000000000 | 12
Inserts work with a subquery, to avoid caching of `channel_id` in the Java application
try (var con = this.dataSource.getConnection(); //
var pst = con.prepareStatement(
"INSERT INTO data_integer_raw (time, channel_id, value) VALUES (?, (SELECT _channel_id FROM openems_get_or_create_channel_id(?, ?, ?)), ?);"); //
) {
for (var point : points) {
var value = parseValue(point.value);
if (value == null) {
continue;
}
pst.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(point.timestamp)));
pst.setString(2, point.edgeId);
pst.setString(3, point.channelAddress.getComponentId());
pst.setString(4, point.channelAddress.getChannelId());
pst.setBigDecimal(5, value);
pst.addBatch();
}
pst.executeBatch();
} catch (SQLException e) {
this.log.error("Unable to write Points: " + e.getMessage());
}
Example for querying `data_integer_5m`
CREATE EXTENSION IF NOT EXISTS tablefunc;
SELECT *
FROM crosstab($$
SELECT d.time, component.name || '/' || channel.name, d.avg
FROM edge
LEFT JOIN component
ON edge.id = component.edge_id
LEFT JOIN channel
ON component.id = channel.component_id
LEFT JOIN data_integer_5m d
ON channel.id = d.channel_id
WHERE edge.name = 'edge0'
ORDER BY 1,2
$$, $$
SELECT unnest('{_sum/EssActivePower,_sum/EssSoc,ess0/Soc}'::text[])
$$) AS ct (
time TIMESTAMPTZ, "_sum/EssActivePower" NUMERIC, "_sum/EssSoc" NUMERIC, "ess0/Soc" NUMERIC
);
Example
time | _sum/EssActivePower | _sum/EssSoc | ess0/Soc
------------------------+------------------------+---------------------+--------------------
-
2022-05-25 14:35:00+02 | -1003.0000000000000000 | 50.3333333333333333 | 50.3333333333333333
2022-05-25 14:40:00+02 | -1003.0000000000000000 | 51.2500000000000000 | 51.2500000000000000
Doubts/drawbacks till now:
- In OpenEMS Backend by default we only receive a Json, without knowing the type. The current draft would use
Numeric
as type for Boolean, Short, Integer, Long, Float, Double. Downsides:- Other data types (like String) would be dropped/not stored in the database
- Numeric is rather expensive, compared to e.g. specific
INTEGER
data type in PostgreSQL. I am not sure if this leads to much higher memory and disk space consumption. This mainly applies to the raw data table. The aggregated data (at leastavg()
) is anyway of typeNUMERIC
.
- Each insert with subquery comes with a certain overhead. I assume, that PostgreSQL is very efficent for these subqueries, but I am not sure if caching
channel_id
in Java would be faster/better - More to come, as I continue developing and dig deeperâŚ
Regards,
Stefan