diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ba2403ef..9458fb6fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Opportunity to close writer in `CsvFileSink` +- `CsvWindowedTimeSeriesSource` that allows for efficient reading of a time series. +It utilizes an open buffered reader (remember to close the source!). +Each entry is only read on demand resp. put to a configurable buffer within the source itself. ### Fixed - adapted `LineInput` constructor to convert line length to `StandardUnits.LINE_LENGTH` diff --git a/src/main/java/edu/ie3/datamodel/io/source/TimeSeriesSource.java b/src/main/java/edu/ie3/datamodel/io/source/TimeSeriesSource.java index e74284867..5d045bcb0 100644 --- a/src/main/java/edu/ie3/datamodel/io/source/TimeSeriesSource.java +++ b/src/main/java/edu/ie3/datamodel/io/source/TimeSeriesSource.java @@ -5,10 +5,14 @@ */ package edu.ie3.datamodel.io.source; +import edu.ie3.datamodel.io.factory.timeseries.SimpleTimeBasedValueData; +import edu.ie3.datamodel.io.factory.timeseries.TimeBasedSimpleValueFactory; import edu.ie3.datamodel.models.timeseries.individual.IndividualTimeSeries; +import edu.ie3.datamodel.models.timeseries.individual.TimeBasedValue; import edu.ie3.datamodel.models.value.Value; import edu.ie3.util.interval.ClosedInterval; import java.time.ZonedDateTime; +import java.util.Map; import java.util.Optional; /** @@ -40,4 +44,22 @@ public interface TimeSeriesSource extends DataSource { * @return Option on a value for that time */ Optional getValue(ZonedDateTime time); + + /** + * Build a {@link TimeBasedValue} of type {@code V}, whereas the underlying {@link Value} does not + * need any additional information. + * + * @param fieldToValues Mapping from field id to values + * @param valueClass Class of the desired underlying value + * @param factory Factory to process the "flat" information + * @return Optional simple time based value + */ + default Optional> buildTimeBasedValue( + Map fieldToValues, + Class valueClass, + TimeBasedSimpleValueFactory factory) { + SimpleTimeBasedValueData factoryData = + new SimpleTimeBasedValueData<>(fieldToValues, valueClass); + return factory.get(factoryData); + } } diff --git a/src/main/java/edu/ie3/datamodel/io/source/csv/CsvTimeSeriesSource.java b/src/main/java/edu/ie3/datamodel/io/source/csv/CsvTimeSeriesSource.java index 887c8e9dd..74f02f56a 100644 --- a/src/main/java/edu/ie3/datamodel/io/source/csv/CsvTimeSeriesSource.java +++ b/src/main/java/edu/ie3/datamodel/io/source/csv/CsvTimeSeriesSource.java @@ -199,22 +199,4 @@ private IndividualTimeSeries buildIndividualTimeSeries( throw new SourceException("Error during reading of file'" + filePath + "'.", e); } } - - /** - * Build a {@link TimeBasedValue} of type {@code V}, whereas the underlying {@link Value} does not - * need any additional information. - * - * @param fieldToValues Mapping from field id to values - * @param valueClass Class of the desired underlying value - * @param factory Factory to process the "flat" information - * @return Optional simple time based value - */ - private Optional> buildTimeBasedValue( - Map fieldToValues, - Class valueClass, - TimeBasedSimpleValueFactory factory) { - SimpleTimeBasedValueData factoryData = - new SimpleTimeBasedValueData<>(fieldToValues, valueClass); - return factory.get(factoryData); - } } diff --git a/src/main/java/edu/ie3/datamodel/io/source/csv/CsvWindowedTimeSeriesSource.java b/src/main/java/edu/ie3/datamodel/io/source/csv/CsvWindowedTimeSeriesSource.java new file mode 100644 index 000000000..6a139e574 --- /dev/null +++ b/src/main/java/edu/ie3/datamodel/io/source/csv/CsvWindowedTimeSeriesSource.java @@ -0,0 +1,254 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation +*/ +package edu.ie3.datamodel.io.source.csv; + +import edu.ie3.datamodel.io.factory.timeseries.TimeBasedSimpleValueFactory; +import edu.ie3.datamodel.io.naming.FileNamingStrategy; +import edu.ie3.datamodel.io.source.TimeSeriesSource; +import edu.ie3.datamodel.models.timeseries.individual.IndividualTimeSeries; +import edu.ie3.datamodel.models.timeseries.individual.TimeBasedValue; +import edu.ie3.datamodel.models.value.Value; +import edu.ie3.util.TimeUtil; +import edu.ie3.util.interval.ClosedInterval; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This csv source for {@link IndividualTimeSeries} utilizes the functionalities of a buffered file + * reader. That means, that you can only read in your time series linearly from the beginning to the + * end. As soon, as you have queried an instance in time, you are only able to query future + * instances! Moreover, you have to remember to close this source or use it within a + * try-with-resources! + * + * @param Type of value to be returned + */ +public class CsvWindowedTimeSeriesSource extends CsvDataSource + implements TimeSeriesSource, AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(CsvWindowedTimeSeriesSource.class); + + private final Duration maximumForeSight; + private ClosedInterval coveredInterval; + + private final String filePath; + private final BufferedReader reader; + private final Stream> inputStream; + + private final Map buffer = new HashMap<>(); + + public CsvWindowedTimeSeriesSource( + String csvSep, + String folderPath, + String filePath, + FileNamingStrategy fileNamingStrategy, + Duration maximumForeSight, + Class valueClass, + TimeBasedSimpleValueFactory factory) { + super(csvSep, folderPath, fileNamingStrategy); + this.maximumForeSight = maximumForeSight; + this.filePath = filePath; + try { + this.reader = super.connector.initReader(filePath); + this.inputStream = + filterEmptyOptionals( + buildStreamWithFieldsToAttributesMap(TimeBasedValue.class, reader) + .map( + fieldToValue -> this.buildTimeBasedValue(fieldToValue, valueClass, factory))); + } catch (FileNotFoundException e) { + throw new RuntimeException( + "Opening the reader for time series file '" + filePath + "' failed.", e); + } + } + + @Override + public IndividualTimeSeries getTimeSeries() { + throw new UnsupportedOperationException( + "This time series source is not able to return a full time series."); + } + + @Override + public IndividualTimeSeries getTimeSeries(ClosedInterval timeInterval) { + if (!Objects.isNull(coveredInterval) + && timeInterval.getLower().isBefore(coveredInterval.getLower())) + throw new RuntimeException( + "The buffer window already passed the start '" + + timeInterval.getLower() + + "' of your desired time frame."); + + if (Objects.isNull(coveredInterval) + || timeInterval.getUpper().isAfter(coveredInterval.getUpper())) { + /* If the buffer is empty, or you can foresee, that the questioned instance isn't within the buffer, fill it up */ + fillUpBuffer(timeInterval.getUpper()); + } + + Set> tbvs = + getFromBuffer(timeInterval.getLower(), timeInterval.getUpper()) + .map(entry -> new TimeBasedValue<>(entry.getKey(), entry.getValue())) + .collect(Collectors.toSet()); + + /* Reduce the buffer */ + shrinkBuffer(timeInterval.getUpper()); + + return new IndividualTimeSeries<>(UUID.randomUUID(), tbvs); + } + + @Override + public Optional getValue(ZonedDateTime time) { + if (!Objects.isNull(coveredInterval) && time.isBefore(coveredInterval.getLower())) + throw new RuntimeException( + "The buffer window already passed your desired time instance '" + time + "'."); + + if (Objects.isNull(coveredInterval) || time.isAfter(coveredInterval.getUpper())) { + /* If the buffer is empty, or you can foresee, that the questioned instance isn't within the buffer, fill it up */ + fillUpBuffer(time); + } + + Optional value = getFromBuffer(time); + + /* Reduce the buffer */ + shrinkBuffer(time); + + return value; + } + + /** + * Get the value for the questioned time from the buffer + * + * @param time Questioned time + * @return Optional value, that might be within the buffer + */ + private Optional getFromBuffer(ZonedDateTime time) { + return buffer.entrySet().stream() + .filter( + timeToValue -> + timeToValue.getKey().isBefore(time) || timeToValue.getKey().isEqual(time)) + .max(Map.Entry.comparingByKey()) + .map(Map.Entry::getValue); + } + + /** + * Get a defined time slice from buffer + * + * @param start Start of the slice (included) + * @param end End of the slice (included) + * @return A stream for that slice + */ + private Stream> getFromBuffer( + ZonedDateTime start, ZonedDateTime end) { + return buffer.entrySet().stream() + .filter( + timeToValue -> + (timeToValue.getKey().isAfter(start) || timeToValue.getKey().isEqual(start)) + && (timeToValue.getKey().isBefore(end) || timeToValue.getKey().isEqual(end))); + } + + /** + * Fills up the buffer. This is done by adding the {@link this#maximumForeSight} to the time of + * interest. This marks the newest possible instance in the buffer to appear. + * + * @param timeOfInterest The time of interest + */ + private void fillUpBuffer(ZonedDateTime timeOfInterest) { + ZonedDateTime intendedNewestTimeInBuffer = timeOfInterest.plus(maximumForeSight); + inputStream + .filter( + tbv -> + tbv.getTime().isBefore(intendedNewestTimeInBuffer) + || tbv.getTime().isEqual(intendedNewestTimeInBuffer)) + .forEach(tbv -> buffer.put(tbv.getTime(), tbv.getValue())); + updateNewestTime(intendedNewestTimeInBuffer); + } + + /** + * Removes all entries in the buffer, that are older than the provided instance in time. + * + * @param oldestEntry The oldest entry meant to remain within the buffer + */ + private void shrinkBuffer(ZonedDateTime oldestEntry) { + List timesToRemove = + buffer.keySet().stream() + .filter(bufferTime -> bufferTime.isBefore(oldestEntry)) + .collect(Collectors.toList()); + timesToRemove.forEach(buffer::remove); + updateOldestTime(oldestEntry); + } + + /** + * Expand the covered time frame + * + * @param time Newest covered time + */ + private void updateNewestTime(ZonedDateTime time) { + if (Objects.isNull(coveredInterval)) { + coveredInterval = new ClosedInterval<>(time, time); + } else { + coveredInterval = new ClosedInterval<>(coveredInterval.getLower(), time); + } + } + + /** + * Reduce the covered time frame + * + * @param time Oldest covered time + */ + private void updateOldestTime(ZonedDateTime time) { + if (Objects.isNull(coveredInterval)) { + coveredInterval = new ClosedInterval<>(time, time); + } else { + coveredInterval = new ClosedInterval<>(time, coveredInterval.getUpper()); + } + } + + /** + * Determines all available time steps within the given time series + * + * @return A list of available {@link ZonedDateTime}s + */ + public List getAvailableTimeSteps() { + return getAvailableTimeSteps("time", "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'"); + } + + /** + * Determines all available time steps within the given time series + * + * @param timeField Field, where date time information is located + * @param timePattern Pattern of the date time strings + * @return A list of available {@link ZonedDateTime}s + */ + public List getAvailableTimeSteps(String timeField, String timePattern) { + TimeUtil timeUtil = new TimeUtil(ZoneId.of("UTC"), Locale.GERMANY, timePattern); + try (BufferedReader reader = super.connector.initReader(this.filePath)) { + return buildStreamWithFieldsToAttributesMap(TimeBasedValue.class, reader) + .map( + fieldToValue -> + Optional.ofNullable(fieldToValue.get(timeField)).map(timeUtil::toZonedDateTime)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } catch (IOException e) { + logger.warn( + "Opening a reader for time series file '" + + filePath + + "' failed. Unable to determine available time steps.", + e); + } + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + inputStream.close(); + reader.close(); + } +} diff --git a/src/test/groovy/edu/ie3/datamodel/io/source/csv/CsvWindowedTimeSeriesSourceTest.groovy b/src/test/groovy/edu/ie3/datamodel/io/source/csv/CsvWindowedTimeSeriesSourceTest.groovy new file mode 100644 index 000000000..7b027cf0b --- /dev/null +++ b/src/test/groovy/edu/ie3/datamodel/io/source/csv/CsvWindowedTimeSeriesSourceTest.groovy @@ -0,0 +1,154 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ +package edu.ie3.datamodel.io.source.csv + +import edu.ie3.datamodel.io.factory.timeseries.TimeBasedSimpleValueFactory +import edu.ie3.datamodel.io.naming.FileNamingStrategy +import edu.ie3.datamodel.models.StandardUnits +import edu.ie3.datamodel.models.value.EnergyPriceValue +import edu.ie3.util.interval.ClosedInterval +import spock.lang.Specification +import tech.units.indriya.quantity.Quantities + +import java.time.Duration +import java.time.ZoneId +import java.time.ZonedDateTime + +class CsvWindowedTimeSeriesSourceTest extends Specification implements CsvTestDataMeta { + def "The windowed time series source is able to query an instance in time"() { + given: + def factory = new TimeBasedSimpleValueFactory(EnergyPriceValue) + def source = new CsvWindowedTimeSeriesSource( + ";", + timeSeriesFolderPath, + "its_c_2fcb3e53-b94a-4b96-bea4-c469e499f1a1", + new FileNamingStrategy(), + Duration.ofHours(2L), + EnergyPriceValue, + factory) + def expected = new EnergyPriceValue(Quantities.getQuantity(125.0, StandardUnits.ENERGY_PRICE)) + + when: + def actual = source.getValue(ZonedDateTime.of(2020, 1, 1, 0, 15, 0, 0, ZoneId.of("UTC"))) + + then: + actual.isPresent() + actual.get() == expected + + source.close() + } + + def "The windowed time series source is able to query multiple instances in time"() { + given: + def factory = new TimeBasedSimpleValueFactory(EnergyPriceValue) + def source = new CsvWindowedTimeSeriesSource( + ";", + timeSeriesFolderPath, + "its_c_2fcb3e53-b94a-4b96-bea4-c469e499f1a1", + new FileNamingStrategy(), + Duration.ofHours(2L), + EnergyPriceValue, + factory) + + when: + source.getValue(ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC"))) + source.getValue(ZonedDateTime.of(2020, 1, 1, 0, 15, 0, 0, ZoneId.of("UTC"))) + + then: + noExceptionThrown() + + source.close() + } + + def "The windowed time series source throws an exception, if the queried time is before the currently covered interval"() { + given: + def factory = new TimeBasedSimpleValueFactory(EnergyPriceValue) + def source = new CsvWindowedTimeSeriesSource( + ";", + timeSeriesFolderPath, + "its_c_2fcb3e53-b94a-4b96-bea4-c469e499f1a1", + new FileNamingStrategy(), + Duration.ofHours(2L), + EnergyPriceValue, + factory) + + when: + source.getValue(ZonedDateTime.of(2020, 1, 1, 0, 15, 0, 0, ZoneId.of("UTC"))) + source.getValue(ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC"))) + + then: + def thrown = thrown(RuntimeException) + thrown.message == "The buffer window already passed your desired time instance '2020-01-01T00:00Z[UTC]'." + + source.close() + } + + def "The windowed time series source throws an exception, if the queried time frame starts before the currently covered interval"() { + given: + def factory = new TimeBasedSimpleValueFactory(EnergyPriceValue) + def source = new CsvWindowedTimeSeriesSource( + ";", + timeSeriesFolderPath, + "its_c_2fcb3e53-b94a-4b96-bea4-c469e499f1a1", + new FileNamingStrategy(), + Duration.ofHours(2L), + EnergyPriceValue, + factory) + def start = ZonedDateTime.of(2020, 1, 1, 0, 00, 0, 0, ZoneId.of("UTC")) + + when: + source.getValue(ZonedDateTime.of(2020, 1, 1, 0, 0, 15, 0, ZoneId.of("UTC"))) + source.getTimeSeries(new ClosedInterval(start, start.plusHours(2L))) + + then: + def thrown = thrown(RuntimeException) + thrown.message == "The buffer window already passed the start '2020-01-01T00:00Z[UTC]' of your desired time frame." + + source.close() + } + + def "The windowed time series source is able to load a time series for a given interval"() { + given: + def factory = new TimeBasedSimpleValueFactory(EnergyPriceValue) + def source = new CsvWindowedTimeSeriesSource( + ";", + timeSeriesFolderPath, + "its_c_2fcb3e53-b94a-4b96-bea4-c469e499f1a1", + new FileNamingStrategy(), + Duration.ofHours(2L), + EnergyPriceValue, + factory) + def start = ZonedDateTime.of(2020, 1, 1, 0, 00, 0, 0, ZoneId.of("UTC")) + def end = ZonedDateTime.of(2020, 1, 1, 0, 15, 0, 0, ZoneId.of("UTC")) + + when: + def actual = source.getTimeSeries(new ClosedInterval(start, end)) + + then: + actual.entries.size() == 2 + + source.close() + } + + def "The windowed time series source is able to determine all available time steps"() { + given: + def factory = new TimeBasedSimpleValueFactory(EnergyPriceValue) + def source = new CsvWindowedTimeSeriesSource( + ";", + timeSeriesFolderPath, + "its_c_2fcb3e53-b94a-4b96-bea4-c469e499f1a1", + new FileNamingStrategy(), + Duration.ofHours(2L), + EnergyPriceValue, + factory) + + when: + def actual = source.availableTimeSteps + + then: + actual.size() == 2 + } +}