Skip to content

Commit

Permalink
Extract Creating Serialized Timeline into method
Browse files Browse the repository at this point in the history
  • Loading branch information
Mythicaeda committed Jul 23, 2024
1 parent 1586938 commit 966e77b
Showing 1 changed file with 52 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -842,39 +842,11 @@ public SimulationActivityExtract computeActivitySimulationResults(
return new SimulationActivityExtract(startTime, elapsedTime, simulatedActivities, unfinishedActivities);
}


/** Compute a set of results from the current state of simulation. */
// TODO: Move result extraction out of the SimulationEngine.
// The Engine should only need to stream events of interest to a downstream consumer.
// The Engine cannot be cognizant of all downstream needs.
// TODO: Whatever mechanism replaces `computeResults` also ought to replace `isTaskComplete`.
// TODO: Produce results for all tasks, not just those that have completed.
// Planners need to be aware of failed or unfinished tasks.
public SimulationResults computeResults (
final Instant startTime,
final Topic<ActivityDirectiveId> activityTopic,
private TreeMap<Duration, List<EventGraph<EventRecord>>> createSerializedTimeline(
final TemporalEventSource combinedTimeline,
final Iterable<SerializableTopic<?>> serializableTopics,
final SimulationResourceManager resourceManager
) {
final var combinedTimeline = this.combineTimeline();
// Collect per-task information from the event graph.
final var spanInfo = computeSpanInfo(activityTopic, serializableTopics, combinedTimeline);

// Extract profiles for every resource.
final var resourceProfiles = resourceManager.computeProfiles(elapsedTime);
final var realProfiles = resourceProfiles.realProfiles();
final var discreteProfiles = resourceProfiles.discreteProfiles();

final var activityResults = computeActivitySimulationResults(startTime, spanInfo);

final List<Triple<Integer, String, ValueSchema>> topics = new ArrayList<>();
final var serializableTopicToId = new HashMap<SerializableTopic<?>, Integer>();
for (final var serializableTopic : serializableTopics) {
serializableTopicToId.put(serializableTopic, topics.size());
topics.add(Triple.of(topics.size(), serializableTopic.name(), serializableTopic.outputType().getSchema()));
}

final var spanToActivities = spanToSimulatedActivities(spanInfo);
final HashMap<SpanId, SimulatedActivityId> spanToActivities,
final HashMap<SerializableTopic<?>, Integer> serializableTopicToId) {
final var serializedTimeline = new TreeMap<Duration, List<EventGraph<EventRecord>>>();
var time = Duration.ZERO;
for (var point : combinedTimeline.points()) {
Expand Down Expand Up @@ -904,7 +876,7 @@ public SimulationResults computeResults (
}
}
}
var activitySpanID = Optional.ofNullable(spanToActivities.get(event.provenance()).id());
var activitySpanID = Optional.of(spanToActivities.get(event.provenance()).id());
output = EventGraph.concurrently(
output,
EventGraph.atom(
Expand All @@ -923,6 +895,47 @@ public SimulationResults computeResults (
}
}
}
return serializedTimeline;
}


/** Compute a set of results from the current state of simulation. */
// TODO: Move result extraction out of the SimulationEngine.
// The Engine should only need to stream events of interest to a downstream consumer.
// The Engine cannot be cognizant of all downstream needs.
// TODO: Whatever mechanism replaces `computeResults` also ought to replace `isTaskComplete`.
// TODO: Produce results for all tasks, not just those that have completed.
// Planners need to be aware of failed or unfinished tasks.
public SimulationResults computeResults (
final Instant startTime,
final Topic<ActivityDirectiveId> activityTopic,
final Iterable<SerializableTopic<?>> serializableTopics,
final SimulationResourceManager resourceManager
) {
final var combinedTimeline = this.combineTimeline();
// Collect per-task information from the event graph.
final var spanInfo = computeSpanInfo(activityTopic, serializableTopics, combinedTimeline);

// Extract profiles for every resource.
final var resourceProfiles = resourceManager.computeProfiles(elapsedTime);
final var realProfiles = resourceProfiles.realProfiles();
final var discreteProfiles = resourceProfiles.discreteProfiles();

final var activityResults = computeActivitySimulationResults(startTime, spanInfo);

final List<Triple<Integer, String, ValueSchema>> topics = new ArrayList<>();
final var serializableTopicToId = new HashMap<SerializableTopic<?>, Integer>();
for (final var serializableTopic : serializableTopics) {
serializableTopicToId.put(serializableTopic, topics.size());
topics.add(Triple.of(topics.size(), serializableTopic.name(), serializableTopic.outputType().getSchema()));
}

final var serializedTimeline = createSerializedTimeline(
combinedTimeline,
serializableTopics,
spanToSimulatedActivities(spanInfo),
serializableTopicToId
);

return new SimulationResults(
realProfiles,
Expand Down Expand Up @@ -960,55 +973,12 @@ public SimulationResults computeResults(
topics.add(Triple.of(topics.size(), serializableTopic.name(), serializableTopic.outputType().getSchema()));
}

final var spanToActivities = spanToSimulatedActivities(spanInfo);
final var serializedTimeline = new TreeMap<Duration, List<EventGraph<EventRecord>>>();
var time = Duration.ZERO;
for (var point : combinedTimeline.points()) {
if (point instanceof TemporalEventSource.TimePoint.Delta delta) {
time = time.plus(delta.delta());
} else if (point instanceof TemporalEventSource.TimePoint.Commit commit) {
final var serializedEventGraph = commit.events().substitute(
event -> {
// TODO can we do this more efficiently?
EventGraph<EventRecord> output = EventGraph.empty();
for (final var serializableTopic : serializableTopics) {
Optional<SerializedValue> serializedEvent = trySerializeEvent(event, serializableTopic);
if (serializedEvent.isPresent()) {
// If the event's `provenance` has no simulated activity id, search its ancestors to find the nearest
// simulated activity id, if one exists
if (!spanToActivities.containsKey(event.provenance())) {
var spanId = Optional.of(event.provenance());

while (true) {
if (spanToActivities.containsKey(spanId.get())) {
spanToActivities.put(event.provenance(), spanToActivities.get(spanId.get()));
break;
}
spanId = this.getSpan(spanId.get()).parent();
if (spanId.isEmpty()) {
break;
}
}
}
var activitySpanID = Optional.ofNullable(spanToActivities.get(event.provenance()).id());
output = EventGraph.concurrently(
output,
EventGraph.atom(
new EventRecord(serializableTopicToId.get(serializableTopic),
activitySpanID,
serializedEvent.get())));
}
}
return output;
}
).evaluate(new EventGraph.IdentityTrait<>(), EventGraph::atom);
if (!(serializedEventGraph instanceof EventGraph.Empty)) {
serializedTimeline
.computeIfAbsent(time, x -> new ArrayList<>())
.add(serializedEventGraph);
}
}
}
final var serializedTimeline = createSerializedTimeline(
combinedTimeline,
serializableTopics,
spanToSimulatedActivities(spanInfo),
serializableTopicToId
);

return new SimulationResults(
realProfiles,
Expand Down

0 comments on commit 966e77b

Please sign in to comment.