Skip to content

Commit

Permalink
feat: move to new http client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 2, 2025
1 parent 2328f79 commit 0059f0b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 60 deletions.
74 changes: 21 additions & 53 deletions src/main/java/io/kestra/plugin/airflow/AirflowConnection.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package io.kestra.plugin.airflow;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.airflow.model.DagRunResponse;
import io.kestra.plugin.core.http.HttpInterface;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
Expand All @@ -19,10 +21,6 @@
import lombok.extern.slf4j.Slf4j;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Base64;
import java.util.Map;

@SuperBuilder
Expand Down Expand Up @@ -55,24 +53,25 @@ public abstract class AirflowConnection extends Task {
title = "Request options"
)
@PluginProperty
protected HttpInterface.RequestOptions options;
protected HttpConfiguration options;

protected DagRunResponse triggerDag(RunContext runContext, String dagId, String requestBody) throws Exception {
String baseUrl = runContext.render(this.baseUrl).as(String.class).orElseThrow();
URI triggerUri = URI.create(DAG_RUNS_ENDPOINT_FORMAT.formatted(baseUrl, dagId));

try (HttpClient client = getClientBuilder().build()) {
HttpRequest request = getRequestBuilder(runContext, triggerUri)
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.method("POST")
.body(HttpRequest.StringRequestBody.builder().content(requestBody).build())
.build();

HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
HttpResponse<DagRunResponse> response = client.request(request, DagRunResponse.class);

if (response.statusCode() != 200) {
throw new IllegalStateException("Failed to trigger DAG: " + response.body());
if (response.getStatus().getCode() != 200) {
throw new IllegalStateException("Failed to trigger DAG: " + response.getBody());
}

return objectMapper.readValue(response.body(), DagRunResponse.class);
return response.getBody();
}
}

Expand All @@ -81,58 +80,27 @@ protected DagRunResponse getDagStatus(RunContext runContext, String dagId, Strin

try (HttpClient client = getClientBuilder().build()) {
HttpRequest statusRequest = getRequestBuilder(runContext, statusUri)
.GET()
.build();

HttpResponse<String> response = client.send(statusRequest, HttpResponse.BodyHandlers.ofString());
HttpResponse<DagRunResponse> response = client.request(statusRequest, DagRunResponse.class);

if (response.statusCode() != 200) {
throw new IllegalStateException("Failed to get DAG run status: " + response.body());
if (response.getStatus().getCode() != 200) {
throw new IllegalStateException("Failed to get DAG run status: " + response.getBody());
}

return objectMapper.readValue(response.body(), DagRunResponse.class);
return response.getBody();
}
}

private HttpClient.Builder getClientBuilder() {
HttpClient.Builder clientBuilder = HttpClient.newBuilder();
private io.kestra.core.http.client.HttpClient.HttpClientBuilder getClientBuilder() {
return io.kestra.core.http.client.HttpClient.builder()
.configuration(this.options);

if (this.options != null && this.options.getConnectTimeout() != null) {
clientBuilder.connectTimeout(options.getConnectTimeout());
}

return clientBuilder;
}

private HttpRequest.Builder getRequestBuilder(RunContext runContext, URI uri) throws IllegalVariableEvaluationException {
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
private HttpRequest.HttpRequestBuilder getRequestBuilder(RunContext runContext, URI uri) {
return HttpRequest.builder()
.uri(uri)
.header("Content-Type", JSON_CONTENT_TYPE);

setupCustomHeaders(runContext, requestBuilder);

return requestBuilder;
}

private void setupCustomHeaders(RunContext runContext, HttpRequest.Builder requestBuilder) throws IllegalVariableEvaluationException {
if (this.options != null && this.options.getBasicAuthUser() != null && this.options.getBasicAuthPassword() != null) {
String authorizationString = "%s:%s"
.formatted(
runContext.render(this.options.getBasicAuthUser()),
runContext.render(this.options.getBasicAuthPassword())
);

String auth = Base64
.getEncoder()
.encodeToString(authorizationString.getBytes());

requestBuilder.header("Authorization", "Basic " + auth);
}

var headersValue = runContext.render(this.headers).asMap(String.class, String.class);
if (!headersValue.isEmpty()) {
headersValue.forEach(requestBuilder::header);
}
.addHeader("Content-Type", JSON_CONTENT_TYPE);
}

}
21 changes: 14 additions & 7 deletions src/test/java/io/kestra/plugin/airflow/dags/TriggerDagRunTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.kestra.plugin.airflow.dags;

import io.kestra.core.http.client.configurations.BasicAuthConfiguration;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.plugin.core.http.HttpInterface;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand All @@ -30,9 +31,12 @@ void run() throws Exception {
.baseUrl(Property.of(getBaseUrl()))
.dagId(Property.of("tutorial_dag"))
.options(
HttpInterface.RequestOptions.builder()
.basicAuthUser(getUser())
.basicAuthPassword(getPassword())
HttpConfiguration.builder()
.auth(BasicAuthConfiguration.builder()
.username(getUser())
.password(getPassword())
.build()
)
.build()
)
.body(Property.of(
Expand Down Expand Up @@ -65,9 +69,12 @@ void waitForComplete() throws Exception {
.dagId(Property.of("tutorial_dag"))
.wait(Property.of(true))
.options(
HttpInterface.RequestOptions.builder()
.basicAuthUser(getUser())
.basicAuthPassword(getPassword())
HttpConfiguration.builder()
.auth(BasicAuthConfiguration.builder()
.username(getUser())
.password(getPassword())
.build()
)
.build()
)
.body(Property.of(
Expand Down

0 comments on commit 0059f0b

Please sign in to comment.