Skip to content

KAFKA-19239 Rewrite IntegrationTestUtils by java #19776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

jim0987795064
Copy link

@jim0987795064 jim0987795064 commented May 20, 2025

This PR rewrites the IntegrationTestUtils.java from Scala to Java.

Changes:

  • Converted all the existing Scala code in IntegrationTestUtils.scala
    into Java in IntegrationTestUtils.java.
  • Preserved the original logic and functionality to ensure backward
    compatibility.
  • Updated relevant imports and dependencies accordingly.

Motivation:

The rewrite aims to standardize the codebase in Java, which aligns
better with the rest of the project and facilitates easier maintenance
by the Java-centric team.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients labels May 20, 2025
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
Copy link
Collaborator

@TaiJuWu TaiJuWu May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this package be put into server-common module?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

@github-actions github-actions bot removed the triage PRs from the community label May 21, 2025
@jim0987795064 jim0987795064 changed the title (WIP)KAFKA-19239 Rewrite IntegrationTestUtils by java KAFKA-19239 Rewrite IntegrationTestUtils by java May 21, 2025
@chia7712
Copy link
Member

@jim0987795064 please fix the conflicts

@jim0987795064
Copy link
Author

@jim0987795064 please fix the conflicts
Hello @chia7712
Thank you, I have addressed this comment.

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jim0987795064 for this patch, left some comments

ApiError apiError = new ApiError(error, response.data().errorMessage());
handleFailure(apiError.exception());
handleFailure(error.exception(response.data().errorMessage()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, how is this change related to the main scope of this PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @m1a2st,

Thanks for pointing this out! You're absolutely right — this change is unrelated to the main scope of this PR. I'll move it to a separate pull request to keep things clean and focused.

Thanks for your feedback!

Comment on lines 114 to 119
Socket socket = connect(destination, listenerName);
try {
return sendAndReceive(request, socket);
} finally {
socket.close();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Socket socket = connect(destination, listenerName);
try {
return sendAndReceive(request, socket);
} finally {
socket.close();
}
try (Socket socket = connect(destination, listenerName)) {
return sendAndReceive(request, socket);
}

Integer correlationId
) throws IOException {
send(request, socket, clientId, correlationId);
return (T) receive(socket, request.apiKey(), request.version());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (T) receive(socket, request.apiKey(), request.version());
return receive(socket, request.apiKey(), request.version());

sendWithHeader(request, header, socket);
}

public static <T extends AbstractResponse> T receive(Socket socket, ApiKeys apiKey, short version) throws IOException, ClassCastException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the @SuppressWarnings("unchecked")

Comment on lines 67 to 70
public static void send(AbstractRequest request, Socket socket) throws IOException {
RequestHeader header = nextRequestHeader(request.apiKey(), request.version(), "client-id", 0);
sendWithHeader(request, header, socket);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unused in project, I think we should remove it.

}

public static RequestHeader nextRequestHeader(ApiKeys apiKey, short apiVersion) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line.

@github-actions github-actions bot added the tests Test fixes (including flaky tests) label May 27, 2025
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this file to the kafka/server-common/src/test/java/org/apache/kafka/server

AbstractRequest request,
int port
) throws IOException {
Socket socket = connect(port);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        try (Socket socket = connect(port)) {
            return sendAndReceive(request, socket);
        }

}

def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
val socketServer = cluster.brokerSocketServers().asScala.head
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add following helper to ClusterInstance?

    default Collection<Integer> boundPorts() {
        return brokers().values().stream()
                .map(KafkaBroker::socketServer)
                .map(s -> s.boundPort(clientListener()))
                .collect(Collectors.toList());
    }

this helper can streamline this PR I believe

Copy link
Author

@jim0987795064 jim0987795064 May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @chia7712 I've addressed these 3 issues mentioned above.
Could you please review these?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with that helper, we can use val socket = IntegrationTestUtils.connect(cluster.boundPorts().asScala.head) to streamline code, right?

@@ -205,11 +205,12 @@ class ProducerIntegrationTest {
.setProducerId(RecordBatch.NO_PRODUCER_ID)
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
val request = new InitProducerIdRequest.Builder(data).build()
val request = new InitProducerIdRequest.Builder(data).build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      val request = new InitProducerIdRequest.Builder(data).build()
      val port = broker.boundPort(listener)
      response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
      shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code

)
listenerName: ListenerName): DescribeClusterResponse = {

val port = destination.boundPort(listenerName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      connectAndReceive[DescribeClusterResponse](new DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
        destination.boundPort(listenerName))

@@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
} else {
cluster.brokerSocketServers().asScala.head
}
IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket, listenerName)

val port = socket.boundPort(listenerName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket.boundPort(listenerName))

}

def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
val socketServer = cluster.brokerSocketServers().asScala.head
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with that helper, we can use val socket = IntegrationTestUtils.connect(cluster.boundPorts().asScala.head) to streamline code, right?


val listenerName = cluster.controllerListenerName
val port = controllerSocketServer.boundPort(listenerName)

IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
      request,
      controllerSocketServer.boundPort(cluster.controllerListenerName)
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients core Kafka Broker tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants