diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java index efbddb54f2..0a71b8b3ae 100644 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/engine/SimulationEngine.java @@ -208,7 +208,8 @@ private void stepEffectModel( // Based on the task's return status, update its execution state and schedule its resumption. if (status instanceof TaskStatus.Completed) { - final var children = new LinkedList<>(this.taskChildren.getOrDefault(task, Collections.emptySet())); + final var children = new LinkedList<>(Optional.ofNullable(this.taskChildren.remove(task)) + .orElseGet(Collections::emptySet)); this.tasks.put(task, progress.completedAt(currentTime, children)); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime)); @@ -218,14 +219,19 @@ private void stepEffectModel( this.tasks.put(task, progress.continueWith(s.continuation())); this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime.plus(s.delay()))); } else if (status instanceof TaskStatus.CallingTask s) { - final var target = TaskId.generate(); - SimulationEngine.this.tasks.put(target, new ExecutionState.InProgress<>(currentTime, s.child().create(this.executor))); - SimulationEngine.this.taskParent.put(target, task); - SimulationEngine.this.taskChildren.computeIfAbsent(task, $ -> new HashSet<>()).add(target); - frame.signal(JobId.forTask(target)); - - this.tasks.put(task, progress.continueWith(s.continuation())); - this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forTask(target))); + if (s.tailCall()) { + this.tasks.put(task, new ExecutionState.InProgress<>(progress.startOffset, s.child().create(this.executor))); + this.scheduledJobs.schedule(JobId.forTask(task), SubInstant.Tasks.at(currentTime)); + } else { + final var target = TaskId.generate(); + this.tasks.put(target, new ExecutionState.InProgress<>(currentTime, s.child().create(this.executor))); + this.taskParent.put(target, task); + this.taskChildren.computeIfAbsent(task, $ -> new HashSet<>()).add(target); + frame.signal(JobId.forTask(target)); + + this.tasks.put(task, progress.continueWith(s.continuation())); + this.waitingTasks.subscribeQuery(task, Set.of(SignalId.forTask(target))); + } } else if (status instanceof TaskStatus.AwaitingCondition s) { final var condition = ConditionId.generate(); this.conditions.put(condition, s.condition()); @@ -438,19 +444,20 @@ public static SimulationResults computeResults( final var name = id.id(); final var resource = state.resource(); + final boolean allowRLE = resource.allowRunLengthCompression(); switch (resource.getType()) { case "real" -> realProfiles.put( name, Pair.of( resource.getOutputType().getSchema(), - serializeProfile(elapsedTime, state, SimulationEngine::extractRealDynamics))); + serializeProfile(elapsedTime, state, SimulationEngine::extractRealDynamics, allowRLE))); case "discrete" -> discreteProfiles.put( name, Pair.of( resource.getOutputType().getSchema(), - serializeProfile(elapsedTime, state, SimulationEngine::extractDiscreteDynamics))); + serializeProfile(elapsedTime, state, SimulationEngine::extractDiscreteDynamics, allowRLE))); default -> throw new IllegalArgumentException( @@ -602,11 +609,24 @@ private interface Translator { Target apply(Resource resource, Dynamics dynamics); } + private static + void appendProfileSegment(ArrayList> profile, Duration duration, Target value, + boolean allowRunLengthCompression) { + final int s = profile.size(); + final ProfileSegment lastSeg = s > 0 ? profile.get(s - 1) : null; + if (allowRunLengthCompression && lastSeg != null && value.equals(lastSeg.dynamics())) { + profile.set(s - 1, new ProfileSegment<>(lastSeg.extent().plus(duration), value)); + } else { + profile.add(new ProfileSegment<>(duration, value)); + } + } + private static List> serializeProfile( final Duration elapsedTime, final ProfilingState state, - final Translator translator + final Translator translator, + final boolean allowRunLengthCompression ) { final var profile = new ArrayList>(state.profile().segments().size()); @@ -615,18 +635,21 @@ List> serializeProfile( var segment = iter.next(); while (iter.hasNext()) { final var nextSegment = iter.next(); - - profile.add(new ProfileSegment<>( - nextSegment.startOffset().minus(segment.startOffset()), - translator.apply(state.resource(), segment.dynamics()))); + appendProfileSegment(profile, + nextSegment.startOffset().minus(segment.startOffset()), + translator.apply(state.resource(), segment.dynamics()), + allowRunLengthCompression); segment = nextSegment; } - profile.add(new ProfileSegment<>( - elapsedTime.minus(segment.startOffset()), - translator.apply(state.resource(), segment.dynamics()))); + appendProfileSegment(profile, + elapsedTime.minus(segment.startOffset()), + translator.apply(state.resource(), segment.dynamics()), + allowRunLengthCompression); } + profile.trimToSize(); + return profile; } diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java index 6738237a37..8d90e41e8b 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Context.java @@ -30,8 +30,12 @@ enum ContextType { Initializing, Reacting, Querying } void emit(Event event, Topic topic); void spawn(TaskFactory task); + void call(TaskFactory task); + void tailCall(TaskFactory task); + void delay(Duration duration); + void waitUntil(Condition condition); } diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java index 9615830a69..adccdfc2c3 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/InitializationContext.java @@ -61,6 +61,11 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot yield during initialization"); } + @Override + public void tailCall(final TaskFactory task) { + throw new IllegalStateException("Cannot yield during initialization"); + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield during initialization"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java index 1c9019f328..5372c20fa4 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ModelActions.java @@ -69,6 +69,18 @@ public static void call(final TaskFactory task) { context.get().call(task); } + public static void tailCall(final Runnable task) { + tailCall(threaded(task)); + } + + public static void tailCall(final Supplier task) { + tailCall(threaded(task)); + } + + public static void tailCall(final TaskFactory task) { + context.get().tailCall(task); + } + public static void defer(final Duration duration, final Runnable task) { spawn(replaying(() -> { delay(duration); spawn(task); })); } diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java index 2425a9ab77..f20cb197e6 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/QueryContext.java @@ -52,6 +52,11 @@ public void call(final TaskFactory task) { throw new IllegalStateException("Cannot schedule tasks in a query-only context"); } + @Override + public void tailCall(final TaskFactory task) { + throw new IllegalStateException("Cannot schedule tasks in a query-only context"); + } + @Override public void delay(final Duration duration) { throw new IllegalStateException("Cannot yield in a query-only context"); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Registrar.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Registrar.java index f9287e34a5..16522c6df2 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Registrar.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/Registrar.java @@ -13,18 +13,34 @@ import java.util.function.UnaryOperator; public final class Registrar { + + /** + * Whether to allow run length compression when saving resource profiles at the end of simulation by default. + * + * This compression is lossless in terms of the overall shape of the profile, but it will combine adjacent profile + * segments with the same value, thus obscuring the fact that multiple resource samples (again, all returning the same + * value) were taken within the segment. + */ + private static final boolean ALLOW_RUN_LENGTH_COMPRESSION_BY_DEFAULT = false; + private final Initializer builder; + private boolean allowRunLengthCompression = ALLOW_RUN_LENGTH_COMPRESSION_BY_DEFAULT; public Registrar(final Initializer builder) { this.builder = Objects.requireNonNull(builder); } + public void allowRunLengthCompression(final boolean allow) { + this.allowRunLengthCompression = allow; + } + public boolean isInitializationComplete() { return (ModelActions.context.get().getContextType() != Context.ContextType.Initializing); } public void discrete(final String name, final Resource resource, final ValueMapper mapper) { - this.builder.resource(name, makeResource("discrete", resource, mapper.getValueSchema(), mapper::serializeValue)); + this.builder.resource(name, makeResource("discrete", resource, mapper.getValueSchema(), mapper::serializeValue, + allowRunLengthCompression)); } public void real(final String name, final Resource resource) { @@ -46,14 +62,16 @@ private void real(final String name, final Resource resource, Unar "rate", ValueSchema.REAL))), dynamics -> SerializedValue.of(Map.of( "initial", SerializedValue.of(dynamics.initial), - "rate", SerializedValue.of(dynamics.rate))))); + "rate", SerializedValue.of(dynamics.rate))), + allowRunLengthCompression)); } private static gov.nasa.jpl.aerie.merlin.protocol.model.Resource makeResource( final String type, final Resource resource, final ValueSchema valueSchema, - final Function serializer + final Function serializer, + final boolean allowRunLengthCompression ) { return new gov.nasa.jpl.aerie.merlin.protocol.model.Resource<>() { @Override @@ -82,6 +100,11 @@ public Value getDynamics(final Querier querier) { return resource.getDynamics(); } } + + @Override + public boolean allowRunLengthCompression() { + return allowRunLengthCompression; + } }; } diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java index e9998ff256..f093dba2a9 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingReactionContext.java @@ -78,6 +78,14 @@ public void call(final TaskFactory task) { }); } + @Override + public void tailCall(final TaskFactory task) { + this.memory.doOnce(() -> { + this.scheduler = null; // Relinquish the current scheduler before yielding, in case an exception is thrown. + this.scheduler = this.handle.tailCall(task); + }); + } + @Override public void delay(final Duration duration) { this.memory.doOnce(() -> { diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingTask.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingTask.java index b18f38f6ca..0ad2ad3dbd 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingTask.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ReplayingTask.java @@ -56,6 +56,11 @@ public Scheduler call(final TaskFactory child) { return this.yield(TaskStatus.calling(child, ReplayingTask.this)); } + @Override + public Scheduler tailCall(final TaskFactory child) { + return this.yield(TaskStatus.tailCalling(child, ReplayingTask.this)); + } + @Override public Scheduler await(final gov.nasa.jpl.aerie.merlin.protocol.model.Condition condition) { return this.yield(TaskStatus.awaiting(condition, ReplayingTask.this)); diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/TaskHandle.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/TaskHandle.java index 609453f0c9..4afe2da452 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/TaskHandle.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/TaskHandle.java @@ -9,5 +9,7 @@ public interface TaskHandle { Scheduler call(TaskFactory child); + Scheduler tailCall(TaskFactory child); + Scheduler await(gov.nasa.jpl.aerie.merlin.protocol.model.Condition condition); } diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java index 3edd568d9e..3f72af4edd 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java @@ -63,6 +63,12 @@ public void call(final TaskFactory task) { this.scheduler = this.handle.call(task); } + @Override + public void tailCall(final TaskFactory task) { + this.scheduler = null; // Relinquish the current scheduler before yielding, in case an exception is thrown. + this.scheduler = this.handle.tailCall(task); + } + @Override public void delay(final Duration duration) { this.scheduler = null; // Relinquish the current scheduler before yielding, in case an exception is thrown. diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTask.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTask.java index 58e3e3b626..84c54aaade 100644 --- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTask.java +++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedTask.java @@ -207,6 +207,11 @@ public Scheduler call(final TaskFactory child) { return this.yield(TaskStatus.calling(child, ThreadedTask.this)); } + @Override + public Scheduler tailCall(final TaskFactory child) { + return this.yield(TaskStatus.tailCalling(child, ThreadedTask.this)); + } + @Override public Scheduler await(final gov.nasa.jpl.aerie.merlin.protocol.model.Condition condition) { return this.yield(TaskStatus.awaiting(condition, ThreadedTask.this)); diff --git a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/Resource.java b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/Resource.java index 65b1bda7c3..1b9fbbe80d 100644 --- a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/Resource.java +++ b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/Resource.java @@ -16,4 +16,18 @@ public interface Resource { * this resource. In other words, it cannot depend on any hidden state.

*/ Dynamics getDynamics(Querier querier); + + /** + * After a simulation completes the entire evolution of the dynamics of this resource will typically be serialized as + * a resource profile consisting of some number of sequential segments. + * + * If run length compression is allowed for this resource then whenever there is a "run" of two or more such segments, + * one after another with the same dynamics, they will be compressed into a single segment during that serialization. + * This does not change the represented evolution of the dynamics of the resource, but it loses the information that a + * sample was taken at the start of each segment after the first in such a run. If a mission model prefers not to + * lose that information then it can return false here. + */ + default boolean allowRunLengthCompression() { + return false; + } } diff --git a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/SerializedValue.java b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/SerializedValue.java index 3187d8051f..d7cf409153 100644 --- a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/SerializedValue.java +++ b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/SerializedValue.java @@ -1,6 +1,8 @@ package gov.nasa.jpl.aerie.merlin.protocol.types; import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; import java.util.List; import java.util.Map; import java.util.Objects; @@ -67,6 +69,10 @@ public T match(final Visitor visitor) { } } + interface DirectNumericValue { + NumericValue asNumericValue(); + } + record NumericValue(BigDecimal value) implements SerializedValue { @Override public T match(final Visitor visitor) { @@ -76,8 +82,12 @@ public T match(final Visitor visitor) { // `BigDecimal#equals` is too strict -- values differing only in representation need to be considered the same. @Override public boolean equals(final Object obj) { - if (!(obj instanceof NumericValue other)) return false; - return (this.value.compareTo(other.value) == 0); + if (obj instanceof NumericValue other) { + return (this.value.compareTo(other.value) == 0); + } else if (obj instanceof DirectNumericValue other) { + return (this.value.compareTo(other.asNumericValue().value) == 0); + } + return false; } @Override @@ -86,6 +96,72 @@ public int hashCode() { } } + record IntValue(int value) implements SerializedValue, DirectNumericValue { + @Override + public T match(final Visitor visitor) { + return visitor.onNumeric(new BigDecimal(value)); + } + + @Override + public NumericValue asNumericValue() { + return new NumericValue(new BigDecimal(value)); + } + + @Override + public boolean equals(final Object obj) { + return asNumericValue().equals(obj); + } + } + + record LongValue(long value) implements SerializedValue, DirectNumericValue { + @Override + public T match(final Visitor visitor) { + return visitor.onNumeric(new BigDecimal(value)); + } + + @Override + public NumericValue asNumericValue() { + return new NumericValue(new BigDecimal(value)); + } + + @Override + public boolean equals(final Object obj) { + return asNumericValue().equals(obj); + } + } + + record DoubleValue(double value) implements SerializedValue, DirectNumericValue { + @Override + public T match(final Visitor visitor) { + return visitor.onNumeric(toBigDecimal()); + } + + @Override + public NumericValue asNumericValue() { + return new NumericValue(toBigDecimal()); + } + + @Override + public boolean equals(final Object obj) { + return asNumericValue().equals(obj); + } + + private BigDecimal toBigDecimal() { + //without MathContext.DECIMAL64 then a double assigned from a string (or code literal) "3.14" + //converts to a BigDecimal=3.140000000000000124344978758017532527446746826171875 + //but since a double can only represent up to 15 decimal digits when going from string -> double -> string + //the nonzero values in the smaller decimal places are just an artifact of the representation + //and there are unit tests that assume that string -> double -> string will be an identity op for e.g. 3.14 + //with MathContext.DECIMAL64 "3.14" converts to a BigDecimal=3.140000000000000 + var bd = new BigDecimal(value, MathContext.DECIMAL64); + if (bd.scale() == 0) { //if the underlying value was actually an integer + //we want to always serialize as a real number, i.e. "1.0" not "1" in JSON + bd = new BigDecimal(bd.unscaledValue().multiply(BigInteger.valueOf(10)), 1); //yes scale=1 not -1 + } + return bd; + } + } + record BooleanValue(boolean value) implements SerializedValue { @Override public T match(final Visitor visitor) { @@ -136,11 +212,11 @@ static SerializedValue of(final BigDecimal value) { /** * Creates a {@link SerializedValue} containing a real number. * - * @param value Any double} value. + * @param value Any double value. * @return A new {@link SerializedValue} containing a real number. */ static SerializedValue of(final double value) { - return new NumericValue(BigDecimal.valueOf(value)); + return new DoubleValue(value); } /** @@ -150,7 +226,17 @@ static SerializedValue of(final double value) { * @return A new {@link SerializedValue} containing an integral number. */ static SerializedValue of(final long value) { - return new NumericValue(BigDecimal.valueOf(value)); + return new LongValue(value); + } + + /** + * Creates a {@link SerializedValue} containing an integral number. + * + * @param value Any integer value. + * @return A new {@link SerializedValue} containing an integral number. + */ + static SerializedValue of(final int value) { + return new IntValue(value); } /** diff --git a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/TaskStatus.java b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/TaskStatus.java index 178b320ba4..2975aba538 100644 --- a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/TaskStatus.java +++ b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/types/TaskStatus.java @@ -9,7 +9,8 @@ record Completed(Return returnValue) implements TaskStatus {} record Delayed(Duration delay, Task continuation) implements TaskStatus {} - record CallingTask(TaskFactory child, Task continuation) implements TaskStatus {} + record CallingTask(TaskFactory child, Task continuation, boolean tailCall) + implements TaskStatus {} record AwaitingCondition(Condition condition, Task continuation) implements TaskStatus {} @@ -23,7 +24,11 @@ static Delayed delayed(final Duration delay, final Task } static CallingTask calling(final TaskFactory child, final Task continuation) { - return new CallingTask<>(child, continuation); + return new CallingTask<>(child, continuation, false); + } + + static CallingTask tailCalling(final TaskFactory child, final Task continuation) { + return new CallingTask<>(child, continuation, true); } static AwaitingCondition awaiting(final Condition condition, final Task continuation) {