From 02a33d7bfe28cf1a791aaa4be3c66e4548d4924f Mon Sep 17 00:00:00 2001 From: jhaug Date: Tue, 7 Jan 2025 15:21:42 -0800 Subject: [PATCH 1/4] Preliminary approach to queueing profile stream uploads on a separate thread --- .../merlin/driver/resources/TaskQueue.java | 17 ++ .../postgres/PostgresProfileQueryHandler.java | 190 ++++++++++++++++++ .../postgres/PostgresProfileStreamer.java | 172 ++-------------- 3 files changed, 222 insertions(+), 157 deletions(-) create mode 100644 merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java create mode 100644 merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java new file mode 100644 index 0000000000..f540852cf3 --- /dev/null +++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java @@ -0,0 +1,17 @@ +package gov.nasa.jpl.aerie.merlin.driver.resources; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TaskQueue { + private static final ExecutorService executor = Executors.newSingleThreadExecutor(); + + public static void addToQueue(Runnable task) { + executor.submit(task); + } + + /** Gracefully shut down the ExecutorService */ + public static void shutdown() { + executor.shutdown(); + } +} diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java new file mode 100644 index 0000000000..69c25117b1 --- /dev/null +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java @@ -0,0 +1,190 @@ +package gov.nasa.jpl.aerie.merlin.worker.postgres; + +import gov.nasa.jpl.aerie.json.JsonParser; +import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfile; +import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles; +import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; +import gov.nasa.jpl.aerie.merlin.protocol.types.RealDynamics; +import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; +import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException; +import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedInsertException; +import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedUpdateException; +import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements; +import org.apache.commons.lang3.tuple.Pair; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; + +import static gov.nasa.jpl.aerie.merlin.driver.json.SerializedValueJsonParser.serializedValueP; +import static gov.nasa.jpl.aerie.merlin.server.http.ProfileParsers.realDynamicsP; +import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.discreteProfileTypeP; +import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.realProfileTypeP; + +/** + * Utility class to handle upload of resource profiles to the database. This class allows for each upload to have its + * own Connection object. + * */ +public class PostgresProfileQueryHandler implements AutoCloseable { + private final Connection connection; + private final HashMap profileIds; + private final HashMap profileDurations; + + private PreparedStatement postProfileStatement; + private PreparedStatement postSegmentsStatement; + private PreparedStatement updateDurationStatement; + + private long datasetId; + + public PostgresProfileQueryHandler(DataSource dataSource, long datasetId) throws SQLException { + this.connection = dataSource.getConnection(); + profileIds = new HashMap<>(); + profileDurations = new HashMap<>(); + this.datasetId = datasetId; + } + + public void prepareStatements() throws SQLException { + final String postProfilesSql = + //language=sql + """ + insert into merlin.profile (dataset_id, name, type, duration) + values (%d, ?, ?::jsonb, ?::interval) + on conflict (dataset_id, name) do nothing + """.formatted(datasetId); + final String postSegmentsSql = + //language=sql + """ + insert into merlin.profile_segment (dataset_id, profile_id, start_offset, dynamics, is_gap) + values (%d, ?, ?::interval, ?::jsonb, false) + """.formatted(datasetId); + final String updateDurationSql = + //language=SQL + """ + update merlin.profile + set duration = ?::interval + where (dataset_id, id) = (%d, ?); + """.formatted(datasetId); + + postProfileStatement = connection.prepareStatement(postProfilesSql, PreparedStatement.RETURN_GENERATED_KEYS); + postSegmentsStatement = connection.prepareStatement(postSegmentsSql, PreparedStatement.NO_GENERATED_KEYS); + updateDurationStatement = connection.prepareStatement(updateDurationSql, PreparedStatement.NO_GENERATED_KEYS); + } + + /** + * Upload profiles, profile segments, and corresponding profile durations to the database. + * */ + public void uploadResourceProfiles(final ResourceProfiles resourceProfiles) { + try { + prepareStatements(); + // Add new profiles to DB + for (final var realEntry : resourceProfiles.realProfiles().entrySet()) { + if (!profileIds.containsKey(realEntry.getKey())) { + addRealProfileToBatch(realEntry.getKey(), realEntry.getValue()); + } + } + for (final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) { + if (!profileIds.containsKey(discreteEntry.getKey())) { + addDiscreteProfileToBatch(discreteEntry.getKey(), discreteEntry.getValue()); + } + } + postProfiles(); + + // Post Segments + for (final var realEntry : resourceProfiles.realProfiles().entrySet()) { + addProfileSegmentsToBatch(realEntry.getKey(), realEntry.getValue(), realDynamicsP); + } + for (final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) { + addProfileSegmentsToBatch(discreteEntry.getKey(), discreteEntry.getValue(), serializedValueP); + } + + postProfileSegments(); + updateProfileDurations(); + } catch (SQLException ex) { + throw new DatabaseException("Exception occurred while posting profiles.", ex); + } + } + + private void addRealProfileToBatch(final String name, ResourceProfile profile) throws SQLException { + postProfileStatement.setString(1, name); + postProfileStatement.setString(2, realProfileTypeP.unparse(Pair.of("real", profile.schema())).toString()); + PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO); + + postProfileStatement.addBatch(); + + profileDurations.put(name, Duration.ZERO); + } + + private void addDiscreteProfileToBatch(final String name, ResourceProfile profile) throws SQLException { + postProfileStatement.setString(1, name); + postProfileStatement.setString(2, discreteProfileTypeP.unparse(Pair.of("discrete", profile.schema())).toString()); + PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO); + + postProfileStatement.addBatch(); + + profileDurations.put(name, Duration.ZERO); + } + + /** + * Insert the batched profiles and cache their ids for future use. + * + * This method takes advantage of the fact that we're using the Postgres JDBC, + * which returns all columns when executing batches with `getGeneratedKeys`. + */ + private void postProfiles() throws SQLException { + final var results = this.postProfileStatement.executeBatch(); + for (final var result : results) { + if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment"); + } + + final var resultSet = this.postProfileStatement.getGeneratedKeys(); + while(resultSet.next()){ + profileIds.put(resultSet.getString("name"), resultSet.getInt("id")); + } + } + + private void postProfileSegments() throws SQLException { + final var results = this.postSegmentsStatement.executeBatch(); + for (final var result : results) { + if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment"); + } + } + + private void updateProfileDurations() throws SQLException { + final var results = this.updateDurationStatement.executeBatch(); + for (final var result : results) { + if (result == Statement.EXECUTE_FAILED) throw new FailedUpdateException("merlin.profile"); + } + } + + private void addProfileSegmentsToBatch(final String name, ResourceProfile profile, JsonParser dynamicsP) throws SQLException { + final var id = profileIds.get(name); + this.postSegmentsStatement.setLong(1, id); + + var newDuration = profileDurations.get(name); + for (final var segment : profile.segments()) { + PreparedStatements.setDuration(this.postSegmentsStatement, 2, newDuration); + final var dynamics = dynamicsP.unparse(segment.dynamics()).toString(); + this.postSegmentsStatement.setString(3, dynamics); + this.postSegmentsStatement.addBatch(); + + newDuration = newDuration.plus(segment.extent()); + } + + this.updateDurationStatement.setLong(2, id); + PreparedStatements.setDuration(this.updateDurationStatement, 1, newDuration); + this.updateDurationStatement.addBatch(); + + profileDurations.put(name, newDuration); + } + + @Override + public void close() throws SQLException { + this.postProfileStatement.close(); + this.postSegmentsStatement.close(); + this.updateDurationStatement.close(); + this.connection.close(); + } +} diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java index b0ebb7d60b..206c3836fb 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java @@ -1,180 +1,38 @@ package gov.nasa.jpl.aerie.merlin.worker.postgres; -import gov.nasa.jpl.aerie.json.JsonParser; -import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfile; import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles; -import gov.nasa.jpl.aerie.merlin.protocol.types.Duration; -import gov.nasa.jpl.aerie.merlin.protocol.types.RealDynamics; -import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue; +import gov.nasa.jpl.aerie.merlin.driver.resources.TaskQueue; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException; -import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedInsertException; -import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedUpdateException; -import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements; -import org.apache.commons.lang3.tuple.Pair; import javax.sql.DataSource; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; import java.util.function.Consumer; -import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.discreteProfileTypeP; -import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.realProfileTypeP; - -import static gov.nasa.jpl.aerie.merlin.driver.json.SerializedValueJsonParser.serializedValueP; -import static gov.nasa.jpl.aerie.merlin.server.http.ProfileParsers.realDynamicsP; - public class PostgresProfileStreamer implements Consumer, AutoCloseable { - private final Connection connection; - private final HashMap profileIds; - private final HashMap profileDurations; - - private final PreparedStatement postProfileStatement; - private final PreparedStatement postSegmentsStatement; - private final PreparedStatement updateDurationStatement; + private final DataSource dataSource; + private long datasetId; public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException { - this.connection = dataSource.getConnection(); - profileIds = new HashMap<>(); - profileDurations = new HashMap<>(); - - final String postProfilesSql = - //language=sql - """ - insert into merlin.profile (dataset_id, name, type, duration) - values (%d, ?, ?::jsonb, ?::interval) - on conflict (dataset_id, name) do nothing - """.formatted(datasetId); - final String postSegmentsSql = - //language=sql - """ - insert into merlin.profile_segment (dataset_id, profile_id, start_offset, dynamics, is_gap) - values (%d, ?, ?::interval, ?::jsonb, false) - """.formatted(datasetId); - final String updateDurationSql = - //language=SQL - """ - update merlin.profile - set duration = ?::interval - where (dataset_id, id) = (%d, ?); - """.formatted(datasetId); + this.dataSource = dataSource; + this.datasetId = datasetId; - postProfileStatement = connection.prepareStatement(postProfilesSql, PreparedStatement.RETURN_GENERATED_KEYS); - postSegmentsStatement = connection.prepareStatement(postSegmentsSql, PreparedStatement.NO_GENERATED_KEYS); - updateDurationStatement = connection.prepareStatement(updateDurationSql, PreparedStatement.NO_GENERATED_KEYS); } @Override public void accept(final ResourceProfiles resourceProfiles) { - try { - // Add new profiles to DB - for(final var realEntry : resourceProfiles.realProfiles().entrySet()){ - if(!profileIds.containsKey(realEntry.getKey())){ - addRealProfileToBatch(realEntry.getKey(), realEntry.getValue()); - } - } - for(final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) { - if(!profileIds.containsKey(discreteEntry.getKey())){ - addDiscreteProfileToBatch(discreteEntry.getKey(), discreteEntry.getValue()); - } - } - postProfiles(); - - // Post Segments - for(final var realEntry : resourceProfiles.realProfiles().entrySet()){ - addProfileSegmentsToBatch(realEntry.getKey(), realEntry.getValue(), realDynamicsP); - } - for(final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) { - addProfileSegmentsToBatch(discreteEntry.getKey(), discreteEntry.getValue(), serializedValueP); + TaskQueue.addToQueue(() -> { + try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) { + transaction.uploadResourceProfiles(resourceProfiles); + } catch (SQLException e) { + throw new DatabaseException("Exception occurred while posting profiles.", e); } - - postProfileSegments(); - updateProfileDurations(); - } catch (SQLException ex) { - throw new DatabaseException("Exception occurred while posting profiles.", ex); - } - } - - private void addRealProfileToBatch(final String name, ResourceProfile profile) throws SQLException { - postProfileStatement.setString(1, name); - postProfileStatement.setString(2, realProfileTypeP.unparse(Pair.of("real", profile.schema())).toString()); - PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO); - - postProfileStatement.addBatch(); - - profileDurations.put(name, Duration.ZERO); - } - - private void addDiscreteProfileToBatch(final String name, ResourceProfile profile) throws SQLException { - postProfileStatement.setString(1, name); - postProfileStatement.setString(2, discreteProfileTypeP.unparse(Pair.of("discrete", profile.schema())).toString()); - PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO); - - postProfileStatement.addBatch(); - - profileDurations.put(name, Duration.ZERO); - } - - /** - * Insert the batched profiles and cache their ids for future use. - * - * This method takes advantage of the fact that we're using the Postgres JDBC, - * which returns all columns when executing batches with `getGeneratedKeys`. - */ - private void postProfiles() throws SQLException { - final var results = this.postProfileStatement.executeBatch(); - for (final var result : results) { - if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment"); - } - - final var resultSet = this.postProfileStatement.getGeneratedKeys(); - while(resultSet.next()){ - profileIds.put(resultSet.getString("name"), resultSet.getInt("id")); - } - } - - private void postProfileSegments() throws SQLException { - final var results = this.postSegmentsStatement.executeBatch(); - for (final var result : results) { - if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment"); - } - } - - private void updateProfileDurations() throws SQLException { - final var results = this.updateDurationStatement.executeBatch(); - for (final var result : results) { - if (result == Statement.EXECUTE_FAILED) throw new FailedUpdateException("merlin.profile"); - } - } - - private void addProfileSegmentsToBatch(final String name, ResourceProfile profile, JsonParser dynamicsP) throws SQLException { - final var id = profileIds.get(name); - this.postSegmentsStatement.setLong(1, id); - - var newDuration = profileDurations.get(name); - for (final var segment : profile.segments()) { - PreparedStatements.setDuration(this.postSegmentsStatement, 2, newDuration); - final var dynamics = dynamicsP.unparse(segment.dynamics()).toString(); - this.postSegmentsStatement.setString(3, dynamics); - this.postSegmentsStatement.addBatch(); - - newDuration = newDuration.plus(segment.extent()); - } - - this.updateDurationStatement.setLong(2, id); - PreparedStatements.setDuration(this.updateDurationStatement, 1, newDuration); - this.updateDurationStatement.addBatch(); - - profileDurations.put(name, newDuration); + }); } @Override - public void close() throws SQLException { - this.postProfileStatement.close(); - this.postSegmentsStatement.close(); - this.updateDurationStatement.close(); - this.connection.close(); + public void close() { + // This class should conform to the AutoCloseable interface, but member variables that could be closed have + // been abstracted out to PostgresProfileQueryHandler, which auto-closes in the above accept method. } + } From 0db8fbb2b22e52f758fb7b130d2e51fa24562b3e Mon Sep 17 00:00:00 2001 From: jhaug Date: Tue, 7 Jan 2025 16:55:14 -0800 Subject: [PATCH 2/4] Better, non-static approach to handling the queue --- .../merlin/driver/resources/TaskQueue.java | 17 ----------------- .../postgres/PostgresProfileStreamer.java | 9 ++++----- .../worker/postgres/PostgresQueryQueue.java | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 22 deletions(-) delete mode 100644 merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java create mode 100644 merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java deleted file mode 100644 index f540852cf3..0000000000 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java +++ /dev/null @@ -1,17 +0,0 @@ -package gov.nasa.jpl.aerie.merlin.driver.resources; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class TaskQueue { - private static final ExecutorService executor = Executors.newSingleThreadExecutor(); - - public static void addToQueue(Runnable task) { - executor.submit(task); - } - - /** Gracefully shut down the ExecutorService */ - public static void shutdown() { - executor.shutdown(); - } -} diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java index 206c3836fb..516498e14b 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java @@ -1,7 +1,6 @@ package gov.nasa.jpl.aerie.merlin.worker.postgres; import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles; -import gov.nasa.jpl.aerie.merlin.driver.resources.TaskQueue; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException; import javax.sql.DataSource; @@ -11,16 +10,17 @@ public class PostgresProfileStreamer implements Consumer, AutoCloseable { private final DataSource dataSource; private long datasetId; + private PostgresQueryQueue queryQueue; public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException { this.dataSource = dataSource; this.datasetId = datasetId; - + this.queryQueue = new PostgresQueryQueue(); } @Override public void accept(final ResourceProfiles resourceProfiles) { - TaskQueue.addToQueue(() -> { + queryQueue.addToQueue(() -> { try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) { transaction.uploadResourceProfiles(resourceProfiles); } catch (SQLException e) { @@ -31,8 +31,7 @@ public void accept(final ResourceProfiles resourceProfiles) { @Override public void close() { - // This class should conform to the AutoCloseable interface, but member variables that could be closed have - // been abstracted out to PostgresProfileQueryHandler, which auto-closes in the above accept method. + queryQueue.shutdown(); } } diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java new file mode 100644 index 0000000000..5be891877d --- /dev/null +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java @@ -0,0 +1,17 @@ +package gov.nasa.jpl.aerie.merlin.worker.postgres; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class PostgresQueryQueue { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + public void addToQueue(Runnable task) { + executor.submit(task); + } + + /** Gracefully shut down the ExecutorService */ + public void shutdown() { + executor.shutdown(); + } +} From 4a820b8f72af92f1118d430c2ba0ab2a0f6fc54c Mon Sep 17 00:00:00 2001 From: jhaug Date: Thu, 6 Feb 2025 16:10:35 -0800 Subject: [PATCH 3/4] Moved prepareStatements() to constructor of PostgresProfileQueryHandler --- .../merlin/worker/postgres/PostgresProfileQueryHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java index 69c25117b1..30d3a6956a 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java @@ -44,6 +44,7 @@ public PostgresProfileQueryHandler(DataSource dataSource, long datasetId) throws profileIds = new HashMap<>(); profileDurations = new HashMap<>(); this.datasetId = datasetId; + prepareStatements(); } public void prepareStatements() throws SQLException { @@ -78,7 +79,6 @@ on conflict (dataset_id, name) do nothing * */ public void uploadResourceProfiles(final ResourceProfiles resourceProfiles) { try { - prepareStatements(); // Add new profiles to DB for (final var realEntry : resourceProfiles.realProfiles().entrySet()) { if (!profileIds.containsKey(realEntry.getKey())) { From 24221cb51036a46a1dc8b4a7774a9e033073279e Mon Sep 17 00:00:00 2001 From: jhaug Date: Wed, 12 Feb 2025 10:54:34 -0800 Subject: [PATCH 4/4] Removed PosgresQueryQueue and refactored PosgresProfileStreamer to use shared PostgresProfileQueryHandler --- .../postgres/PostgresProfileQueryHandler.java | 17 ++++--------- .../postgres/PostgresProfileStreamer.java | 25 ++++++++++--------- .../worker/postgres/PostgresQueryQueue.java | 17 ------------- 3 files changed, 18 insertions(+), 41 deletions(-) delete mode 100644 merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java index 30d3a6956a..8f00cb4872 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileQueryHandler.java @@ -25,29 +25,22 @@ import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.realProfileTypeP; /** - * Utility class to handle upload of resource profiles to the database. This class allows for each upload to have its - * own Connection object. + * Utility class to handle upload of resource profiles to the database. * */ public class PostgresProfileQueryHandler implements AutoCloseable { private final Connection connection; private final HashMap profileIds; private final HashMap profileDurations; - private PreparedStatement postProfileStatement; - private PreparedStatement postSegmentsStatement; - private PreparedStatement updateDurationStatement; - - private long datasetId; + private final PreparedStatement postProfileStatement; + private final PreparedStatement postSegmentsStatement; + private final PreparedStatement updateDurationStatement; public PostgresProfileQueryHandler(DataSource dataSource, long datasetId) throws SQLException { - this.connection = dataSource.getConnection(); + connection = dataSource.getConnection(); profileIds = new HashMap<>(); profileDurations = new HashMap<>(); - this.datasetId = datasetId; - prepareStatements(); - } - public void prepareStatements() throws SQLException { final String postProfilesSql = //language=sql """ diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java index 516498e14b..15008f54e7 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java @@ -5,33 +5,34 @@ import javax.sql.DataSource; import java.sql.SQLException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; public class PostgresProfileStreamer implements Consumer, AutoCloseable { - private final DataSource dataSource; - private long datasetId; - private PostgresQueryQueue queryQueue; + private final ExecutorService queryQueue; + private final PostgresProfileQueryHandler queryHandler; public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException { - this.dataSource = dataSource; - this.datasetId = datasetId; - this.queryQueue = new PostgresQueryQueue(); + this.queryQueue = Executors.newSingleThreadExecutor(); + this.queryHandler = new PostgresProfileQueryHandler(dataSource, datasetId); } @Override public void accept(final ResourceProfiles resourceProfiles) { - queryQueue.addToQueue(() -> { - try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) { - transaction.uploadResourceProfiles(resourceProfiles); - } catch (SQLException e) { - throw new DatabaseException("Exception occurred while posting profiles.", e); - } + queryQueue.submit(() -> { + queryHandler.uploadResourceProfiles(resourceProfiles); }); } @Override public void close() { queryQueue.shutdown(); + try { + queryHandler.close(); + } catch (SQLException e) { + throw new DatabaseException("Error occurred while attempting to close PostgresProfileQueryHandler", e); + } } } diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java deleted file mode 100644 index 5be891877d..0000000000 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java +++ /dev/null @@ -1,17 +0,0 @@ -package gov.nasa.jpl.aerie.merlin.worker.postgres; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class PostgresQueryQueue { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - - public void addToQueue(Runnable task) { - executor.submit(task); - } - - /** Gracefully shut down the ExecutorService */ - public void shutdown() { - executor.shutdown(); - } -}