diff --git a/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/Config.java b/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/Config.java index 287aec74cfe..17e01cf4e57 100644 --- a/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/Config.java +++ b/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/Config.java @@ -3,6 +3,7 @@ import org.osgi.service.metatype.annotations.AttributeDefinition; import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import io.openems.common.channel.PersistencePriority; import io.openems.shared.influxdb.QueryLanguageConfig; @ObjectClassDefinition(// @@ -46,5 +47,8 @@ @AttributeDefinition(name = "Read-Only mode", description = "Activates the read-only mode. Then no data is written to InfluxDB.") boolean isReadOnly() default false; + @AttributeDefinition(name = "Persistence Priority", description = "Store only Channels with a Persistence Priority above this. Be aware that too many writes can wear-out your flash storage.") + PersistencePriority persistencePriority() default PersistencePriority.MEDIUM; + String webconsole_configurationFactory_nameHint() default "Timedata InfluxDB [{id}]"; } \ No newline at end of file diff --git a/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/TimedataInfluxDbImpl.java b/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/TimedataInfluxDbImpl.java index 10517dc27a4..22370abb9ef 100644 --- a/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/TimedataInfluxDbImpl.java +++ b/io.openems.edge.timedata.influxdb/src/io/openems/edge/timedata/influxdb/TimedataInfluxDbImpl.java @@ -27,6 +27,7 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; +import io.openems.common.channel.AccessMode; import io.openems.common.exceptions.OpenemsError.OpenemsNamedException; import io.openems.common.oem.OpenemsEdgeOem; import io.openems.common.timedata.Resolution; @@ -127,56 +128,37 @@ protected synchronized void collectAndWriteChannelValues() { final var point = Point.measurement(this.config.measurement()).time(timestamp, WritePrecision.MS); final var addedAtLeastOneChannelValue = new AtomicBoolean(false); - this.componentManager.getEnabledComponents().stream().filter(OpenemsComponent::isEnabled) - .forEach(component -> { - component.channels().forEach(channel -> { - switch (channel.channelDoc().getAccessMode()) { - case WRITE_ONLY: - // ignore Write-Only-Channels - return; - case READ_ONLY: - case READ_WRITE: - break; - } - - Optional valueOpt = channel.value().asOptional(); - if (!valueOpt.isPresent()) { - // ignore not available channels - return; - } - Object value = valueOpt.get(); - var address = channel.address().toString(); - try { - switch (channel.getType()) { - case BOOLEAN: - point.addField(address, (Boolean) value ? 1 : 0); - break; - case SHORT: - point.addField(address, (Short) value); - break; - case INTEGER: - point.addField(address, (Integer) value); - break; - case LONG: - point.addField(address, (Long) value); - break; - case FLOAT: - point.addField(address, (Float) value); - break; - case DOUBLE: - point.addField(address, (Double) value); - break; - case STRING: - point.addField(address, (String) value); - break; - } - } catch (IllegalArgumentException e) { - this.log.warn("Unable to add Channel [" + address + "] value [" + value + "]: " - + e.getMessage()); - return; + this.componentManager.getEnabledComponents().stream() // + .flatMap(component -> component.channels().stream()) // + .filter(channel -> { + final var doc = channel.channelDoc(); + return doc.getPersistencePriority().isAtLeast(this.config.persistencePriority()) + && doc.getAccessMode() != AccessMode.WRITE_ONLY; // + }) // + .forEach(channel -> { + Optional valueOpt = channel.value().asOptional(); + if (!valueOpt.isPresent()) { + // ignore not available channels + return; + } + Object value = valueOpt.get(); + var address = channel.address().toString(); + try { + switch (channel.getType()) { + case BOOLEAN -> point.addField(address, (Boolean) value ? 1 : 0); + case SHORT -> point.addField(address, (Short) value); + case INTEGER -> point.addField(address, (Integer) value); + case LONG -> point.addField(address, (Long) value); + case FLOAT -> point.addField(address, (Float) value); + case DOUBLE -> point.addField(address, (Double) value); + case STRING -> point.addField(address, (String) value); } - addedAtLeastOneChannelValue.set(true); - }); + } catch (IllegalArgumentException e) { + this.log.warn( + "Unable to add Channel [" + address + "] value [" + value + "]: " + e.getMessage()); + return; + } + addedAtLeastOneChannelValue.set(true); }); if (addedAtLeastOneChannelValue.get()) { diff --git a/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/MyConfig.java b/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/MyConfig.java index 37b1713dfd0..b08d4b7666e 100644 --- a/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/MyConfig.java +++ b/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/MyConfig.java @@ -1,5 +1,6 @@ package io.openems.edge.timedata.influxdb; +import io.openems.common.channel.PersistencePriority; import io.openems.common.test.AbstractComponentConfig; import io.openems.shared.influxdb.QueryLanguageConfig; @@ -17,6 +18,7 @@ protected static class Builder { private String url; private QueryLanguageConfig queryLanguage; private String measurement; + private PersistencePriority persistencePriority; private Builder() { } @@ -71,6 +73,11 @@ public Builder setMeasurement(String measurement) { return this; } + public Builder setPersistencePriority(PersistencePriority persistencePriority) { + this.persistencePriority = persistencePriority; + return this; + } + public MyConfig build() { return new MyConfig(this); } @@ -132,6 +139,11 @@ public boolean isReadOnly() { return this.builder.isReadOnly; } + @Override + public PersistencePriority persistencePriority() { + return this.builder.persistencePriority; + } + @Override public String measurement() { return this.builder.measurement; diff --git a/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/TimedataInfluxDbImplTest.java b/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/TimedataInfluxDbImplTest.java index 0810dcdfe68..4c097c21c69 100644 --- a/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/TimedataInfluxDbImplTest.java +++ b/io.openems.edge.timedata.influxdb/test/io/openems/edge/timedata/influxdb/TimedataInfluxDbImplTest.java @@ -2,6 +2,7 @@ import org.junit.Test; +import io.openems.common.channel.PersistencePriority; import io.openems.common.oem.DummyOpenemsEdgeOem; import io.openems.edge.common.test.AbstractComponentTest.TestCase; import io.openems.edge.common.test.ComponentTest; @@ -30,6 +31,7 @@ public void test() throws Exception { .setNoOfCycles(1) // .setMaxQueueSize(5000) // .setReadOnly(false) // + .setPersistencePriority(PersistencePriority.MEDIUM) .build()) // .next(new TestCase()) // ;