diff --git a/src/main/java/com/cognite/client/stream/AbstractPublisher.java b/src/main/java/com/cognite/client/stream/AbstractPublisher.java new file mode 100644 index 00000000..43c7eb9d --- /dev/null +++ b/src/main/java/com/cognite/client/stream/AbstractPublisher.java @@ -0,0 +1,97 @@ +package com.cognite.client.stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Abstract superclass for streaming publishers. It holds key constants and variables. + * + */ +public abstract class AbstractPublisher { + // Defaults and boundary values + protected static final Duration MIN_POLLING_INTERVAL = Duration.ofMillis(500L); + protected static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(5L); + protected static final Duration MAX_POLLING_INTERVAL = Duration.ofSeconds(60L); + + protected static final Duration MIN_POLLING_OFFSET = Duration.ofMillis(500L); + protected static final Duration DEFAULT_POLLING_OFFSET = Duration.ofSeconds(2L); + protected static final Duration MAX_POLLING_OFFSET = Duration.ofDays(10L); + + protected static final Instant MIN_START_TIME = Instant.EPOCH; + // Have to subtract to guard against overflow + protected static final Instant MAX_END_TIME = Instant.MAX.minus(MAX_POLLING_OFFSET).minusSeconds(1); + + protected final Logger LOG = LoggerFactory.getLogger(this.getClass()); + + // Internal state + protected AtomicBoolean abortStream = new AtomicBoolean(false); + protected AbstractPublisher.State state = AbstractPublisher.State.READY; + + protected abstract Duration getPollingInterval(); + protected abstract Duration getPollingOffset(); + protected abstract Instant getStartTime(); + protected abstract Instant getEndTime(); + + /** + * Starts the streaming job. + * + * The job is executed on a separate thread and this method will immediately return to the caller. It returns + * a {@link Future} that you can use to block the execution of your own code if you want to explicitly + * wait for completion of the streaming job. + * + * @return A Future hosting the end state of the streaming job. The future returns {@code true} when the + * polling loop completes (at its specified end time). {@code false} if the job is aborted before the + * specified end time. + */ + public Future start() { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Future future = executorService.submit(this::run); + executorService.shutdown(); + return future; + } + + /** + * Aborts the current stream operation. It may take a few seconds for this operation to complete. + */ + public void abort() { + abortStream.set(true); + while (State.RUNNING == state) { + // wait for the stream to close + try { + Thread.sleep(1000); + } catch (Exception e) { + break; + } + } + LOG.info("Publisher aborted."); + } + + /** + * Start the main polling loop for reading rows from a raw table. + * + * @return {@code true} when the polling loop completes (at the specified end time). {@code false} if the + * job is aborted before the specified end time. + * @throws Exception + */ + abstract boolean run() throws Exception; + + abstract static class Builder> { + abstract B setPollingInterval(Duration value); + abstract B setPollingOffset(Duration value); + abstract B setStartTime(Instant value); + abstract B setEndTime(Instant value); + } + + enum State { + READY, + RUNNING, + STOPPED + } +} \ No newline at end of file diff --git a/src/main/java/com/cognite/client/stream/Publisher.java b/src/main/java/com/cognite/client/stream/Publisher.java index 60773ea0..b6639fd2 100644 --- a/src/main/java/com/cognite/client/stream/Publisher.java +++ b/src/main/java/com/cognite/client/stream/Publisher.java @@ -1,30 +1,209 @@ package com.cognite.client.stream; +import com.cognite.client.Request; +import com.cognite.client.dto.RawRow; import com.google.auto.value.AutoValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.RandomStringUtils; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.time.Instant; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; /** * * @param - * @param */ @AutoValue -public abstract class Publisher> { - protected final Logger LOG = LoggerFactory.getLogger(this.getClass()); +public abstract class Publisher extends AbstractPublisher { + + private static Builder builder() { + return new AutoValue_Publisher.Builder() + .setPollingInterval(DEFAULT_POLLING_INTERVAL) + .setPollingOffset(DEFAULT_POLLING_OFFSET) + .setStartTime(MIN_START_TIME.plusSeconds(1)) + .setEndTime(MAX_END_TIME) + .setRequest(Request.create()) + ; + } + + public static Publisher of(Source source) { + return Publisher.builder() + .setSource(source) + .build(); + } + + abstract Builder toBuilder(); - private static > Builder builder() { - return new AutoValue_Publisher.Builder(); + abstract Source getSource(); + abstract Request getRequest(); + @Nullable + abstract Consumer> getConsumer(); + + /** + * Add the consumer of the data stream. + * + * The consumer will be called for each batch of {@code T}. This is potentially a blocking operation, + * so you should take care to process the batch efficiently (or spin off processing to a separate thread). + * + * @param consumer The function to call for each batch of {@code T}. + * @return The {@link Publisher} with the consumer configured. + */ + public Publisher withConsumer(Consumer> consumer) { + return toBuilder().setConsumer(consumer).build(); + } + + /** + * Sets the start time (i.e. the earliest possible created/changed time of the CDF Raw Row) of the data stream. + * + * The default start time is at Unix epoch. I.e. the publisher will read all existing rows (if any) in the raw table. + * @param startTime The start time instant + * @return The {@link Publisher} with the consumer configured. + */ + public Publisher withStartTime(Instant startTime) { + Preconditions.checkArgument(startTime.isAfter(MIN_START_TIME) && startTime.isBefore(MAX_END_TIME), + "Start time must be after Unix Epoch and before Instant.MAX.minus(1, ChronoUnit.YEARS)."); + return toBuilder().setStartTime(startTime).build(); } - public static > Publisher of() { - return Publisher.builder().build(); + /** + * Sets the end time (i.e. the latest possible created/changed time of the CDF Raw Row) of the data stream. + * + * The default end time is {@code Instant.MAX}. I.e. the publisher will stream data indefinitely, or until + * aborted. + * @param endTime The end time instant + * @return The {@link Publisher} with the consumer configured. + */ + public Publisher withEndTime(Instant endTime) { + Preconditions.checkArgument(endTime.isAfter(MIN_START_TIME) && endTime.isBefore(MAX_END_TIME), + "End time must be after Unix Epoch and before Instant.MAX.minus(1, ChronoUnit.YEARS)."); + return toBuilder().setEndTime(endTime).build(); + } + + /** + * Sets the polling interval to check for updates to the source raw table. The default polling interval is + * every 5 seconds. You can configure a more or less frequent interval, down to every 0.5 seconds. + * + * @param interval The interval to check the source raw table for updates. + * @return The {@link Publisher} with the consumer configured. + */ + public Publisher withPollingInterval(Duration interval) { + Preconditions.checkArgument(interval.compareTo(MIN_POLLING_INTERVAL) > 0 + && interval.compareTo(MAX_POLLING_INTERVAL) < 0, + String.format("Polling interval must be greater than %s and less than %s.", + MIN_POLLING_INTERVAL, + MAX_POLLING_INTERVAL)); + return toBuilder().setPollingInterval(interval).build(); + } + + /** + * Sets the polling offset. The offset is a time window "buffer" subtracted from the current time when polling + * for data from CDF Raw. It is intended as a safeguard for clock differences between the client (running this + * publisher) and the CDF service. + * + * For example, if the polling offset is 2 seconds, then this publisher will look for data updates up to (and including) + * T-2 seconds. That is, data will be streamed with a 2 second fixed latency/delay. + * + * @param interval The interval to check the source raw table for updates. + * @return The {@link Publisher} with the consumer configured. + */ + public Publisher withPollingOffset(Duration interval) { + Preconditions.checkArgument(interval.compareTo(MIN_POLLING_OFFSET) > 0 + && interval.compareTo(MAX_POLLING_OFFSET) < 0, + String.format("Polling offset must be greater than %s and less than %s.", + MIN_POLLING_OFFSET, + MAX_POLLING_OFFSET)); + return toBuilder().setPollingOffset(interval).build(); + } + + /** + * Sets a baseline {@link Request} to use when producing the steam of objects. The {@link Request} contains + * the set of filters that you want the (stream) objects to satisfy. + * + * @param request The baseline request specifying filters for the object stream. + * @return The {@link Publisher} with the consumer configured. + */ + public Publisher withRequest(Request request) { + return toBuilder().setRequest(request).build(); + } + + + /** + * Start the main polling loop for reading rows from a raw table. + * + * @return {@code true} when the polling loop completes (at the specified end time). {@code false} if the + * job is aborted before the specified end time. + * @throws Exception + */ + boolean run() throws Exception { + final String loggingPrefix = "streaming() [" + RandomStringUtils.randomAlphanumeric(4) + "] - "; + Preconditions.checkNotNull(getConsumer(), + loggingPrefix + "You must specify a Consumer via withConsumer(Consumer>)"); + Preconditions.checkState(getStartTime().isBefore(getEndTime()), + String.format(loggingPrefix + "Start time must be before end time. Start time: %s. End time: %s", + getStartTime(), + getEndTime())); + LOG.info(loggingPrefix + "Setting up streaming read from {}. Time window start: {}. End: {}", + getSource().getClass().getSimpleName(), + getStartTime().toString(), + getEndTime().toString()); + state = State.RUNNING; + + // Set the time range for the first query + long startRange = getStartTime().toEpochMilli(); + long endRange = Instant.now().minus(getPollingOffset()).toEpochMilli(); + + while (Instant.now().isBefore(getEndTime().plus(getPollingOffset())) && !abortStream.get()) { + endRange = Instant.now().minus(getPollingOffset()).toEpochMilli(); + LOG.debug(loggingPrefix + "Enter polling loop with startRange: [{}] and endRange: [{}]", + startRange, + endRange); + if (startRange < endRange) { + Request query = getRequest() + .withFilterParameter("lastUpdatedTime", Map.of( + "min", startRange, + "max", endRange)) + ; + LOG.debug(loggingPrefix + "Send request to read CDF: {}", + query); + + Iterator> iterator = getSource().list(query); + while (iterator.hasNext() && !abortStream.get()) { + List batch = iterator.next(); + if (batch.size() > 0) { + getConsumer().accept(batch); + } + } + } + + LOG.debug(loggingPrefix + "Finished polling loop with startRange: [{}] and endRange: [{}]. Sleeping for {}", + startRange, + endRange, + getPollingInterval().toString()); + + startRange = endRange + 1; // endRange is inclusive in the request, so we must bump the startRange + // Sleep for a polling interval + try { + Thread.sleep(getPollingInterval().toMillis()); + } catch (Exception e) { + LOG.warn(loggingPrefix + "Exception when reading: " + e.toString()); + abortStream.set(true); + } + } + state = State.STOPPED; + return !abortStream.get(); } @AutoValue.Builder - abstract static class Builder> { - //abstract Publisher.Builder setInputIterators( value); + abstract static class Builder extends AbstractPublisher.Builder> { + abstract Builder setSource(Source value); + abstract Builder setConsumer(Consumer> value); + abstract Builder setRequest(Request value); - abstract Publisher build(); + abstract Publisher build(); } } \ No newline at end of file diff --git a/src/main/java/com/cognite/client/stream/RawPublisher.java b/src/main/java/com/cognite/client/stream/RawPublisher.java index 85e54a26..bdb166ce 100644 --- a/src/main/java/com/cognite/client/stream/RawPublisher.java +++ b/src/main/java/com/cognite/client/stream/RawPublisher.java @@ -206,7 +206,7 @@ public void abort() { * @throws Exception */ boolean run() throws Exception { - final String loggingPrefix = "streaming() [" + RandomStringUtils.randomAlphanumeric(6) + "] - "; + final String loggingPrefix = "streaming() [" + RandomStringUtils.randomAlphanumeric(4) + "] - "; Preconditions.checkNotNull(getConsumer(), loggingPrefix + "You must specify a Consumer via withConsumer(Consumer>)"); Preconditions.checkState(getStartTime().isBefore(getEndTime()), diff --git a/src/main/java/com/cognite/client/stream/Source.java b/src/main/java/com/cognite/client/stream/Source.java index 83a1bd52..36f1a38e 100644 --- a/src/main/java/com/cognite/client/stream/Source.java +++ b/src/main/java/com/cognite/client/stream/Source.java @@ -23,5 +23,5 @@ public interface Source { * @param requestParameters the filters to use for retrieving the results objects * @return an {@link Iterator} to page through the results set. */ - public Iterator> list(Request requestParameters); + public Iterator> list(Request requestParameters) throws Exception; }