New integration: TimescaleDB

Dear OpenEMS Community,

I am currently implementing TimescaleDB as a Backend-Timedata alternative to InfluxDB:

My development focus is mainly driven by the requirements I have for FEMS (FENECON Energy Management System), but I would love to hear your thoughts about additional requirements or experiences you made with TimescaleDB or PostgreSQL. Also if there is already any development happening out there, I would love to join forces.

The trouble I found with InfluxDB:

  • Clustering/High Availability is not available as open source.
  • Migration from version 1 to version 2 is not straight forward
  • Aggregation/Retention does not work well with backfilled data (which is a frequent use-case in IoT applications)

According to my research, TimescaleDB looks like a very promising alternative. Being based on PostgreSQL, a technology that is already used in OpenEMS Backend for the Odoo-Metadata implementation.

Currently planned database schema: (click to uncollapse details)

Simple tables for `Edge`, `Component` and `Channel` allows addressing a Channel of an OpenEMS Component for a specific OpenEMS Edge
CREATE TABLE "edge" (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    UNIQUE(name)
);
CREATE TABLE "component" (
    id SERIAL PRIMARY KEY,
	edge_id INTEGER NOT NULL REFERENCES edge,
    name TEXT NOT NULL,
    UNIQUE(edge_id, name)
);
CREATE TABLE "channel" (
    id SERIAL PRIMARY KEY,
	component_id INTEGER NOT NULL REFERENCES component,
    name TEXT NOT NULL,
    UNIQUE(component_id, name)
);

Example

# SELECT * FROM edge;
 id |  name
----+---------
  1 | edge0
  2 | edge1

# SELECT * FROM component;
 id | edge_id |             name
----+---------+------------------------------
  1 |       1 | _sum
  2 |       1 | ess0
  3 |       2 | _sum

# SELECT * FROM channel;
 id  | component_id |                    name
-----+--------------+--------------------------------------------
   1 |            1 | EssSoc
   2 |            1 | Soc
PLSQL functions to generate/query entries for `Edge`, `Component` and `Channel`
CREATE OR REPLACE FUNCTION openems_get_or_create_channel_id(_edge text, _component text, _channel text, OUT _channel_id int)
  LANGUAGE plpgsql AS
$func$
BEGIN
    LOOP
        SELECT channel.id
        FROM edge
        LEFT JOIN component
        ON edge.id = component.edge_id
        LEFT JOIN channel
        ON component.id = channel.component_id
        WHERE edge.name = _edge AND component.name = _component AND channel.name = _channel
        INTO _channel_id;

        EXIT WHEN FOUND;

        INSERT INTO channel (component_id, name)
        VALUES ((SELECT _component_id FROM openems_get_or_create_component_id(_edge, _component)), _channel)
        ON CONFLICT DO NOTHING
        RETURNING id
        INTO _channel_id;

        EXIT WHEN FOUND;
    END LOOP;
END
$func$;

CREATE OR REPLACE FUNCTION openems_get_or_create_component_id(_edge text, _component text, OUT _component_id int)
    LANGUAGE plpgsql AS
$func$
BEGIN
    LOOP
        SELECT component.id
        FROM edge
        LEFT JOIN component
        ON edge.id = component.edge_id
        WHERE edge.name = _edge AND component.name = _component
        INTO _component_id;

        EXIT WHEN FOUND;

        INSERT INTO component (edge_id, name)
        VALUES ((SELECT _edge_id FROM openems_get_or_create_edge_id(_edge)), _component)
        ON CONFLICT DO NOTHING
        RETURNING id
        INTO _component_id;

        EXIT WHEN FOUND;
    END LOOP;
END
$func$;

CREATE OR REPLACE FUNCTION openems_get_or_create_edge_id(_edge text, OUT _edge_id int)
    LANGUAGE plpgsql AS
$func$
BEGIN
    LOOP
        SELECT id
        FROM edge
        WHERE edge.name = _edge
        INTO _edge_id;

        EXIT WHEN FOUND;

        INSERT INTO edge (name)
        VALUES (_edge)
        ON CONFLICT DO NOTHING
        RETURNING id
        INTO _edge_id;

        EXIT WHEN FOUND;
    END LOOP;
END
$func$;
A `data_integer raw` table to hold the actual channel values per Time and Channel. Retention is planned to be set to 30 days.
CREATE TABLE IF NOT EXISTS "data_integer_raw" (time TIMESTAMPTZ NOT NULL, channel_id INTEGER NOT NULL, value NUMERIC NULL);
SELECT create_hypertable('data_integer_raw', 'time');
CREATE INDEX ix_channel_time ON data_integer_raw (time, channel_id DESC);
// TODO Code for Retention

Example

          time          | channel_id |  value
------------------------+------------+----------
 2022-05-25 14:39:50+02 |          1 |       10
 2022-05-25 14:39:51+02 |          1 |       11
 2022-05-25 14:39:52+02 |          1 |       12
 2022-05-25 14:39:53+02 |          1 |       12
A `data_integer_5m` materialized view, that holds continuously aggregated 5 minutes values - being the minimum time span queried by OpenEMS UI
CREATE MATERIALIZED VIEW "data_integer_5m" (time, channel_id, "min", "avg", "max")
WITH (timescaledb.continuous) AS
  SELECT time_bucket('5 minutes', time) AS time, channel_id, min("value"), avg("value"), max("value")
  FROM data_integer_raw
  GROUP BY (1,2);

Example

         time          | channel_id |   min    |           avg           |   max
------------------------+------------+----------+-------------------------+----------
 2022-05-25 14:35:00+02 |        1 |        10 |  11.20000000000000000000 |        12
Inserts work with a subquery, to avoid caching of `channel_id` in the Java application
try (var con = this.dataSource.getConnection(); //
		var pst = con.prepareStatement(
				"INSERT INTO data_integer_raw (time, channel_id, value) VALUES (?, (SELECT _channel_id FROM openems_get_or_create_channel_id(?, ?, ?)), ?);"); //
) {
	for (var point : points) {
		var value = parseValue(point.value);
		if (value == null) {
			continue;
		}

		pst.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(point.timestamp)));
		pst.setString(2, point.edgeId);
		pst.setString(3, point.channelAddress.getComponentId());
		pst.setString(4, point.channelAddress.getChannelId());
		pst.setBigDecimal(5, value);
		pst.addBatch();
	}
	pst.executeBatch();
} catch (SQLException e) {
	this.log.error("Unable to write Points: " + e.getMessage());
}
Example for querying `data_integer_5m`
CREATE EXTENSION IF NOT EXISTS tablefunc;
SELECT *
FROM crosstab($$
    SELECT d.time, component.name || '/' || channel.name, d.avg
    FROM edge
    LEFT JOIN component
    ON edge.id = component.edge_id
    LEFT JOIN channel
    ON component.id = channel.component_id
    LEFT JOIN data_integer_5m d
    ON channel.id = d.channel_id
    WHERE edge.name = 'edge0'
    ORDER BY 1,2
$$, $$
	SELECT unnest('{_sum/EssActivePower,_sum/EssSoc,ess0/Soc}'::text[])
$$) AS ct (
	time TIMESTAMPTZ, "_sum/EssActivePower" NUMERIC, "_sum/EssSoc" NUMERIC, "ess0/Soc" NUMERIC
);

Example

          time          |  _sum/EssActivePower   |     _sum/EssSoc     |      ess0/Soc

------------------------+------------------------+---------------------+--------------------
-
 2022-05-25 14:35:00+02 | -1003.0000000000000000 | 50.3333333333333333 | 50.3333333333333333
 2022-05-25 14:40:00+02 | -1003.0000000000000000 | 51.2500000000000000 | 51.2500000000000000

Doubts/drawbacks till now:

  • In OpenEMS Backend by default we only receive a Json, without knowing the type. The current draft would use Numeric as type for Boolean, Short, Integer, Long, Float, Double. Downsides:
    1. Other data types (like String) would be dropped/not stored in the database
    2. Numeric is rather expensive, compared to e.g. specific INTEGER data type in PostgreSQL. I am not sure if this leads to much higher memory and disk space consumption. This mainly applies to the raw data table. The aggregated data (at least avg()) is anyway of type NUMERIC.
  • Each insert with subquery comes with a certain overhead. I assume, that PostgreSQL is very efficent for these subqueries, but I am not sure if caching channel_id in Java would be faster/better
  • More to come, as I continue developing and dig deeper…

Regards,
Stefan

I am still super happy with InfluxDB v2. I like the Flux query language a lot.

Regarding HA: I have configured telegraf to provide an InfluxDB listener. From that entry point telegraf writes all the received data out to three endpoints (local InfluxDB v2 instance with 90 days retention, InfluxDB Cloud free of charge instance with 30 days retention and a local MQTT broker). Of course, the listener is write-only. As I do all my analytics and automation tasks in Grafana or InfluxDB Tasks this is okay for me.

telegraf buffers the data if one of the endpoints is not available. Once I comes back online, the buffered data is flushed to it. This way one of the two InfluxDB instances can go down and I can still work with the other. Kind of ‘poor man´s HA’. And I never lost any data.

Thank you @martingr for this valuable input! A very interesting setup.

We now try switching to InfluxDB v2 with Flux query language. It looks to my like the perfomance is dramatically low in comparison to the old access method. OpenEMS UI History views take much longer to build up. So right now I am not so happy with this decision. I am curious:

  1. Is someone else experiencing this performance issues?
  2. What is the reason for the switch to the Flux query language?

Another Background for my questions (also a hint for others):
I can’t find the other post, but someone was complaining about wrong energy values a few weeks ago. The reason for this could be the Influx “GROUP BY” command. A short time ago we solved this problem in Influx 1.6 by adding a resolution offset (see Time Boundary in InfluxDB Group by Time Statement · KK's Blog (fromkk) for more): We wanted to do a pull request. But due to the switch to Flux query language this pull request is not needed anymore.
But the problem with the monthly energy values is the same for flux query language. So a resolution offset is needed here too. Unfortunately there seems to be no easy solution now. Because in the flux java library no “resolution offset” method is included. We are still investigating this.