diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java deleted file mode 100644 index f540852cf3..0000000000 --- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/resources/TaskQueue.java +++ /dev/null @@ -1,17 +0,0 @@ -package gov.nasa.jpl.aerie.merlin.driver.resources; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class TaskQueue { - private static final ExecutorService executor = Executors.newSingleThreadExecutor(); - - public static void addToQueue(Runnable task) { - executor.submit(task); - } - - /** Gracefully shut down the ExecutorService */ - public static void shutdown() { - executor.shutdown(); - } -} diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java index 206c3836fb..516498e14b 100644 --- a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresProfileStreamer.java @@ -1,7 +1,6 @@ package gov.nasa.jpl.aerie.merlin.worker.postgres; import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles; -import gov.nasa.jpl.aerie.merlin.driver.resources.TaskQueue; import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException; import javax.sql.DataSource; @@ -11,16 +10,17 @@ public class PostgresProfileStreamer implements Consumer, AutoCloseable { private final DataSource dataSource; private long datasetId; + private PostgresQueryQueue queryQueue; public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException { this.dataSource = dataSource; this.datasetId = datasetId; - + this.queryQueue = new PostgresQueryQueue(); } @Override public void accept(final ResourceProfiles resourceProfiles) { - TaskQueue.addToQueue(() -> { + queryQueue.addToQueue(() -> { try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) { transaction.uploadResourceProfiles(resourceProfiles); } catch (SQLException e) { @@ -31,8 +31,7 @@ public void accept(final ResourceProfiles resourceProfiles) { @Override public void close() { - // This class should conform to the AutoCloseable interface, but member variables that could be closed have - // been abstracted out to PostgresProfileQueryHandler, which auto-closes in the above accept method. + queryQueue.shutdown(); } } diff --git a/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java new file mode 100644 index 0000000000..5be891877d --- /dev/null +++ b/merlin-worker/src/main/java/gov/nasa/jpl/aerie/merlin/worker/postgres/PostgresQueryQueue.java @@ -0,0 +1,17 @@ +package gov.nasa.jpl.aerie.merlin.worker.postgres; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class PostgresQueryQueue { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + public void addToQueue(Runnable task) { + executor.submit(task); + } + + /** Gracefully shut down the ExecutorService */ + public void shutdown() { + executor.shutdown(); + } +}