Skip to content

Commit

Permalink
Better, non-static approach to handling the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
joshhaug committed Feb 4, 2025
1 parent 02a33d7 commit 0db8fbb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gov.nasa.jpl.aerie.merlin.worker.postgres;

import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles;
import gov.nasa.jpl.aerie.merlin.driver.resources.TaskQueue;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException;

import javax.sql.DataSource;
Expand All @@ -11,16 +10,17 @@
public class PostgresProfileStreamer implements Consumer<ResourceProfiles>, AutoCloseable {
private final DataSource dataSource;
private long datasetId;
private PostgresQueryQueue queryQueue;

public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException {
this.dataSource = dataSource;
this.datasetId = datasetId;

this.queryQueue = new PostgresQueryQueue();
}

@Override
public void accept(final ResourceProfiles resourceProfiles) {
TaskQueue.addToQueue(() -> {
queryQueue.addToQueue(() -> {
try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) {
transaction.uploadResourceProfiles(resourceProfiles);
} catch (SQLException e) {
Expand All @@ -31,8 +31,7 @@ public void accept(final ResourceProfiles resourceProfiles) {

@Override
public void close() {
// This class should conform to the AutoCloseable interface, but member variables that could be closed have
// been abstracted out to PostgresProfileQueryHandler, which auto-closes in the above accept method.
queryQueue.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package gov.nasa.jpl.aerie.merlin.worker.postgres;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PostgresQueryQueue {
private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void addToQueue(Runnable task) {
executor.submit(task);
}

/** Gracefully shut down the ExecutorService */
public void shutdown() {
executor.shutdown();
}
}

0 comments on commit 0db8fbb

Please sign in to comment.