From e809fd2903e0f5810d7975895c4693f477af5f45 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Fri, 2 May 2025 12:09:11 +0530 Subject: [PATCH 1/2] [FLINK-37730][rest] Add client method for getting JM exception history --- .../program/rest/RestClusterClient.java | 17 ++++ .../program/rest/RestClusterClientTest.java | 81 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index fc466efa5a179..70faa919b1e4f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -69,6 +69,8 @@ import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders; import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters; import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -92,6 +94,7 @@ import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders; @@ -330,6 +333,20 @@ public CompletableFuture getJobDetails(JobID jobId) { return sendRequest(detailsHeaders, params); } + /** + * Requests the job exception history. + * + * @param jobID The job id + * @return Job exceptions + */ + public CompletableFuture getJobExceptions(JobID jobID) { + final JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance(); + final JobExceptionsMessageParameters params = new JobExceptionsMessageParameters(); + params.jobPathParameter.resolve(jobID); + + return sendRequest(jobExceptionsHeaders, params); + } + @Override public CompletableFuture getJobStatus(JobID jobId) { final CheckedSupplier> operation = diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 27e1835b26308..a34d0c24441e2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -73,6 +73,8 @@ import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; @@ -92,6 +94,7 @@ import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListResponseBody; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; @@ -1298,6 +1301,35 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception { } } + @Test + void testGetJobExceptionsInfoWithHistory() throws Exception { + + final TestJobExceptionsHandler jobExceptionsHandler = new TestJobExceptionsHandler(); + TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(jobExceptionsHandler); + final RestClusterClient restClusterClient = + createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + try { + CompletableFuture future = + restClusterClient.getJobExceptions(jobId); + JobExceptionsInfoWithHistory result = future.get(); + assertThat(result.getExceptionHistory()).isNotNull(); + assertThat(result.getExceptionHistory().getEntries()).hasSize(1); + JobExceptionsInfoWithHistory.RootExceptionInfo rootExceptionInfo = + result.getExceptionHistory().getEntries().get(0); + assertThat(rootExceptionInfo.getExceptionName()).isEqualTo("TestException"); + assertThat(rootExceptionInfo.getStacktrace()).contains("Simulated failure"); + assertThat(rootExceptionInfo.getConcurrentExceptions()).hasSize(1); + JobExceptionsInfoWithHistory.ExceptionInfo concurrent = + rootExceptionInfo.getConcurrentExceptions().iterator().next(); + assertThat(concurrent.getExceptionName()).isEqualTo("TestException"); + assertThat(concurrent.getStacktrace()).contains("Simulated failure"); + assertThat(result.getExceptionHistory().isTruncated()).isFalse(); + } finally { + restClusterClient.close(); + restServerEndpoint.close(); + } + } + private class TestClientCoordinationHandler extends TestHandler< ClientCoordinationRequestBody, @@ -1464,6 +1496,55 @@ protected CompletableFuture handleRequest( } } + private class TestJobExceptionsHandler + extends TestHandler< + EmptyRequestBody, + JobExceptionsInfoWithHistory, + JobExceptionsMessageParameters> { + + private TestJobExceptionsHandler() { + super(JobExceptionsHeaders.getInstance()); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, + @Nonnull DispatcherGateway gateway) + throws RestHandlerException { + JobExceptionsInfoWithHistory.ExceptionInfo exceptionInfo = + new JobExceptionsInfoWithHistory.ExceptionInfo( + "TestException", + "java.lang.RuntimeException: Simulated failure\n\tat" + + " org.test.FakeClass.method(FakeClass.java:42)", + 12345L); + + Collection concurrentExceptions = + Collections.singletonList(exceptionInfo); + + Map failureLabels = new HashMap<>(); + failureLabels.put("flink.operator.failure.label", "simulated"); + + JobExceptionsInfoWithHistory.RootExceptionInfo rootExceptionInfo = + new JobExceptionsInfoWithHistory.RootExceptionInfo( + "TestRootException", + "java.lang.Exception: Test stack trace\n\tat" + + " org.test.FakeClass.method(FakeClass.java:123)", + 12345L, + failureLabels, + concurrentExceptions); + + JobExceptionsInfoWithHistory.JobExceptionHistory exceptionHistory = + new JobExceptionsInfoWithHistory.JobExceptionHistory( + Collections.singletonList(rootExceptionInfo), false // Truncated? + ); + + JobExceptionsInfoWithHistory jobExceptionsInfoWithHistory = + new JobExceptionsInfoWithHistory(exceptionHistory); + + return CompletableFuture.completedFuture(jobExceptionsInfoWithHistory); + } + } + private class TestJobDetailsInfoHandler extends TestHandler { From a25b036f175747bcc3d5b3132c6bd66fd5d77b46 Mon Sep 17 00:00:00 2001 From: Santwana Verma Date: Fri, 2 May 2025 15:37:40 +0530 Subject: [PATCH 2/2] [FLINK-37730][test] Fix failing test --- .../apache/flink/client/program/rest/RestClusterClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index a34d0c24441e2..ee7ac1ed7df53 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -1316,7 +1316,7 @@ void testGetJobExceptionsInfoWithHistory() throws Exception { assertThat(result.getExceptionHistory().getEntries()).hasSize(1); JobExceptionsInfoWithHistory.RootExceptionInfo rootExceptionInfo = result.getExceptionHistory().getEntries().get(0); - assertThat(rootExceptionInfo.getExceptionName()).isEqualTo("TestException"); + assertThat(rootExceptionInfo.getExceptionName()).isEqualTo("TestRootException"); assertThat(rootExceptionInfo.getStacktrace()).contains("Simulated failure"); assertThat(rootExceptionInfo.getConcurrentExceptions()).hasSize(1); JobExceptionsInfoWithHistory.ExceptionInfo concurrent =