Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid timestamptz subtraction for span and event start offsets #1047

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

import static gov.nasa.jpl.aerie.merlin.driver.json.SerializedValueJsonParser.serializedValueP;
import static gov.nasa.jpl.aerie.merlin.protocol.types.Duration.MICROSECONDS;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements.setDuration;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements.setTimestamp;

/*package-local*/ final class InsertSimulationEventsAction implements AutoCloseable {
@Language("SQL") private static final String sql = """
insert into event (dataset_id, real_time, transaction_index, causal_time, topic_index, value)
values (?, ?::timestamptz - ?::timestamptz, ?, ?, ?, ?::jsonb)
values (?, ?::interval, ?, ?, ?, ?::jsonb)
""";

private final PreparedStatement statement;
Expand All @@ -31,16 +32,15 @@ public InsertSimulationEventsAction(final Connection connection) throws SQLExcep

public void apply(
final long datasetId,
final Map<Duration, List<EventGraph<Pair<Integer, SerializedValue>>>> eventPoints,
final Timestamp simulationStart
final Map<Duration, List<EventGraph<Pair<Integer, SerializedValue>>>> eventPoints
) throws SQLException {
for (final var eventPoint : eventPoints.entrySet()) {
final var time = eventPoint.getKey();
final var transactions = eventPoint.getValue();
for (int transactionIndex = 0; transactionIndex < transactions.size(); transactionIndex++) {
final var eventGraph = transactions.get(transactionIndex);
final var flattenedEventGraph = EventGraphFlattener.flatten(eventGraph);
batchInsertEventGraph(datasetId, time, transactionIndex, simulationStart, flattenedEventGraph, this.statement);
batchInsertEventGraph(datasetId, time, transactionIndex, flattenedEventGraph, this.statement);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this done via a static method rather than inlined in the apply like our other batch inserts?

}
}
this.statement.executeBatch();
Expand All @@ -50,21 +50,19 @@ private static void batchInsertEventGraph(
final long datasetId,
final Duration duration,
final int transactionIndex,
final Timestamp simulationStart,
final List<Pair<String, Pair<Integer, SerializedValue>>> flattenedEventGraph,
final PreparedStatement statement
) throws SQLException {
for (final Pair<String, Pair<Integer, SerializedValue>> entry : flattenedEventGraph) {
for (final var entry : flattenedEventGraph) {
final var causalTime = entry.getLeft();
final Pair<Integer, SerializedValue> event = entry.getRight();
final var event = entry.getRight();
Comment on lines -57 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle question: why are these getting converted from their specific types to vars?


statement.setLong(1, datasetId);
setTimestamp(statement, 2, simulationStart.plusMicros(duration.in(MICROSECONDS)));
setTimestamp(statement, 3, simulationStart);
statement.setInt(4, transactionIndex);
statement.setString(5, causalTime);
statement.setInt(6, event.getLeft());
statement.setString(7, serializedValueP.unparse(event.getRight()).toString());
setDuration(statement, 2, duration);
statement.setInt(3, transactionIndex);
statement.setString(4, causalTime);
statement.setInt(5, event.getLeft());
statement.setString(6, serializedValueP.unparse(event.getRight()).toString());

statement.addBatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,22 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.activityAttributesP;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements.setTimestamp;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements.setDuration;

/*package-local*/ final class PostSpansAction implements AutoCloseable {
/*package-local*/ final class InsertSpansAction implements AutoCloseable {
private static final @Language("SQL") String sql = """
insert into span (dataset_id, start_offset, duration, type, attributes)
values (?, ?::timestamptz - ?::timestamptz, ?::timestamptz - ?::timestamptz, ?, ?::jsonb)
values (?, ?::interval, ?::interval, ?, ?::jsonb)
""";

private final PreparedStatement statement;

public PostSpansAction(final Connection connection) throws SQLException {
public InsertSpansAction(final Connection connection) throws SQLException {
this.statement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
}

Expand All @@ -38,26 +37,19 @@ public Map<Long, Long> apply(
final var ids = spans.keySet().stream().toList();
for (final var id : ids) {
final var act = spans.get(id);
final var startTimestamp = new Timestamp(act.start());

final var endTimestamp = act.duration().map(duration -> {
final var actEnd = act.start().plus(duration.dividedBy(Duration.MICROSECOND), ChronoUnit.MICROS);
return new Timestamp(actEnd);
});
final var startOffset = Duration.of(simulationStart.microsUntil(new Timestamp(act.start())), Duration.MICROSECONDS);

statement.setLong(1, datasetId);
setTimestamp(statement, 2, startTimestamp);
setTimestamp(statement, 3, simulationStart);
setDuration(statement, 2, startOffset);

if (endTimestamp.isPresent()) {
setTimestamp(statement, 4, endTimestamp.get());
if (act.duration().isPresent()) {
setDuration(statement, 3, act.duration().get());
} else {
statement.setNull(4, Types.TIMESTAMP_WITH_TIMEZONE);
statement.setNull(3, Types.TIMESTAMP_WITH_TIMEZONE);
}

setTimestamp(statement, 5, startTimestamp);
statement.setString(6, act.type());
statement.setString(7, buildAttributes(act.attributes().directiveId(), act.attributes().arguments(), act.attributes().computedAttributes()));
statement.setString(4, act.type());
statement.setString(5, buildAttributes(act.attributes().directiveId(), act.attributes().arguments(), act.attributes().computedAttributes()));
statement.addBatch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private static void postSimulationResults(
ProfileRepository.postResourceProfiles(connection, datasetId, profileSet);
postActivities(connection, datasetId, results.simulatedActivities, results.unfinishedActivities, simulationStart);
insertSimulationTopics(connection, datasetId, results.topics);
insertSimulationEvents(connection, datasetId, results.events, simulationStart);
insertSimulationEvents(connection, datasetId, results.events);

try (final var setSimulationStateAction = new SetSimulationStateAction(connection)) {
setSimulationStateAction.apply(datasetId, SimulationStateRecord.success());
Expand All @@ -318,15 +318,15 @@ private static void insertSimulationTopics(
}

private static void insertSimulationEvents(
Connection connection,
long datasetId,
Map<Duration, List<EventGraph<Pair<Integer, SerializedValue>>>> events,
Timestamp simulationStart) throws SQLException
final Connection connection,
final long datasetId,
final Map<Duration, List<EventGraph<Pair<Integer, SerializedValue>>>> events
) throws SQLException
{
try (
final var insertSimulationEventsAction = new InsertSimulationEventsAction(connection)
) {
insertSimulationEventsAction.apply(datasetId, events, simulationStart);
insertSimulationEventsAction.apply(datasetId, events);
}
}

Expand All @@ -338,7 +338,7 @@ private static void postActivities(
final Timestamp simulationStart
) throws SQLException {
try (
final var postActivitiesAction = new PostSpansAction(connection);
final var insertSpansAction = new InsertSpansAction(connection);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Looking at this with the rename, I wonder if something like InsertActivitySpansAction rather than just InsertSpansAction (or renaming the function from postActivities to postActivitySpans) would help with clarity, since Aerie does have the concept of non-Activity Spans

final var updateSimulatedActivityParentsAction = new UpdateSimulatedActivityParentsAction(connection)
) {
final var simulatedActivityRecords = simulatedActivities.entrySet().stream()
Expand All @@ -352,7 +352,7 @@ private static void postActivities(
e -> unfinishedActivityToRecord(e.getValue())));
allActivityRecords.putAll(simulatedActivityRecords);

final var simIdToPgId = postActivitiesAction.apply(
final var simIdToPgId = insertSpansAction.apply(
datasetId,
allActivityRecords,
simulationStart);
Expand Down