diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 2c1677e012078..e423cd43f8f9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -248,9 +248,6 @@ private class DataNodeRequestExecutor { } void start() { - parentTask.addListener( - () -> exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(parentTask.getReasonCancelled())) - ); runBatch(0); } @@ -419,7 +416,12 @@ private void runComputeOnDataNode( var parentListener = computeListener.acquireAvoid(); try { // run compute with target shards + var externalSink = exchangeService.getSinkHandler(externalId); var internalSink = exchangeService.createSinkHandler(request.sessionId(), request.pragmas().exchangeBufferSize()); + task.addListener(() -> { + exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())); + exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(task.getReasonCancelled())); + }); DataNodeRequestExecutor dataNodeRequestExecutor = new DataNodeRequestExecutor( request, task, @@ -431,10 +433,6 @@ private void runComputeOnDataNode( ); dataNodeRequestExecutor.start(); // run the node-level reduction - var externalSink = exchangeService.getSinkHandler(externalId); - task.addListener( - () -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())) - ); var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor); exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); var reductionListener = computeListener.acquireCompute();