Skip to content

Commit

Permalink
Migrate to the kestra http client instead of micronaut
Browse files Browse the repository at this point in the history
  • Loading branch information
aeSouid committed Feb 20, 2025
1 parent 318d5e7 commit 23f11c5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 131 deletions.
107 changes: 45 additions & 62 deletions src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java
Original file line number Diff line number Diff line change
@@ -1,98 +1,81 @@
package io.kestra.plugin.dbt.cloud;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.DefaultHttpClientConfiguration;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import io.micronaut.http.client.netty.NettyHttpClientFactory;
import io.micronaut.http.codec.MediaTypeCodecRegistry;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.http.client.configurations.HttpConfiguration;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.io.IOException;

import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Mono;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractDbtCloud extends Task {
@Schema(
title = "Base url to select the tenant."
)
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModule(new JavaTimeModule());

@Schema(title = "Base URL to select the tenant.")
@NotNull
@Builder.Default
Property<String> baseUrl = Property.of("https://cloud.getdbt.com");

@Schema(
title = "Numeric ID of the account."
)
@Schema(title = "Numeric ID of the account.")
@NotNull
Property<String> accountId;

@Schema(
title = "API key."
)
@Schema(title = "API key.")
@NotNull
Property<String> token;

private static final Duration HTTP_READ_TIMEOUT = Duration.ofSeconds(60);
private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory();
@Schema(title = "The HTTP client configuration.")
HttpConfiguration options;

protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
MediaTypeCodecRegistry mediaTypeCodecRegistry = ((DefaultRunContext)runContext).getApplicationContext().getBean(MediaTypeCodecRegistry.class);
/**
* Perform an HTTP request using Kestra HttpClient.
*
* @param requestBuilder The prepared HTTP request builder.
* @param responseType The expected response type.
* @param <RES> The response class.
* @return HttpResponse of type RES.
*/
protected <RES> HttpResponse<RES> request(RunContext runContext, HttpRequest.HttpRequestBuilder requestBuilder, Class<RES> responseType)
throws HttpClientException, IllegalVariableEvaluationException {

var httpConfig = new DefaultHttpClientConfiguration();
httpConfig.setMaxContentLength(Integer.MAX_VALUE);
httpConfig.setReadTimeout(HTTP_READ_TIMEOUT);
var request = requestBuilder
.addHeader("Authorization", "Bearer " + runContext.render(this.token).as(String.class).orElseThrow())
.addHeader("Content-Type", "application/json")
.build();

DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(runContext.render(baseUrl).as(String.class).orElseThrow()).toURL(), httpConfig);
client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry);
try (HttpClient client = new HttpClient(runContext, options)) {
HttpResponse<String> response = client.request(request, String.class);

return client;
}

protected <REQ, RES> HttpResponse<RES> request(RunContext runContext,
MutableHttpRequest<REQ> request,
Argument<RES> argument) throws HttpClientResponseException {
return request(runContext, request, argument, null);
}
protected <REQ, RES> HttpResponse<RES> request(RunContext runContext,
MutableHttpRequest<REQ> request,
Argument<RES> argument,
Duration timeout) throws HttpClientResponseException {
try {
request = request
.bearerAuth(runContext.render(this.token).as(String.class).orElseThrow())
.contentType(MediaType.APPLICATION_JSON);
RES parsedResponse = MAPPER.readValue(response.getBody(), responseType);
return HttpResponse.<RES>builder()
.request(request)
.body(parsedResponse)
.headers(response.getHeaders())
.status(response.getStatus())
.build();

try (HttpClient client = this.client(runContext)) {
Mono<HttpResponse<RES>> mono = Mono.from(client.exchange(request, argument));
return timeout != null ? mono.block(timeout) : mono.block();
}
} catch (HttpClientResponseException e) {
throw new HttpClientResponseException(
"Request failed '" + e.getStatus().getCode() + "' and body '" + e.getResponse().getBody(String.class).orElse("null") + "'",
e,
e.getResponse()
);
} catch (IllegalVariableEvaluationException | MalformedURLException | URISyntaxException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException("Error executing HTTP request", e);
}
}
}
63 changes: 16 additions & 47 deletions src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.plugin.dbt.cloud;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
Expand All @@ -11,10 +13,7 @@
import io.kestra.plugin.dbt.cloud.models.JobStatusHumanizedEnum;
import io.kestra.plugin.dbt.cloud.models.RunResponse;
import io.kestra.plugin.dbt.cloud.models.Step;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.uri.UriTemplate;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
Expand Down Expand Up @@ -192,53 +191,23 @@ private void logSteps(Logger logger, RunResponse runResponse) {
}
}

private Optional<RunResponse> fetchRunResponse(RunContext runContext, Long id, Boolean debug) throws IllegalVariableEvaluationException {
return this
.request(
runContext,
HttpRequest
.create(
HttpMethod.GET,
UriTemplate
.of("/api/v2/accounts/{accountId}/runs/{runId}/" +
"?include_related=" + URLEncoder.encode(
"[\"trigger\",\"job\"," + (debug ? "\"debug_logs\"" : "") + ",\"run_steps\", \"environment\"]",
StandardCharsets.UTF_8
)
)
.expand(Map.of(
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", id
))
),
Argument.of(RunResponse.class),
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
)
.getBody();
private Optional<RunResponse> fetchRunResponse(RunContext runContext, Long id, Boolean debug) throws IllegalVariableEvaluationException, HttpClientException {
HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder()
.uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + "/runs/" + id +
"/?include_related=" + URLEncoder.encode("[\"trigger\",\"job\"," + (debug ? "\"debug_logs\"" : "") + ",\"run_steps\", \"environment\"]", StandardCharsets.UTF_8)))
.method("GET");

return Optional.ofNullable(this.request(runContext, requestBuilder, RunResponse.class).getBody());
}

private Path downloadArtifacts(RunContext runContext, Long runId, String path) throws IllegalVariableEvaluationException, IOException {
String artifact = this
.request(
runContext,
HttpRequest
.create(
HttpMethod.GET,
UriTemplate
.of("/api/v2/accounts/{accountId}/runs/{runId}/artifacts/{path}")
.expand(Map.of(
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", runId,
"path", path
))
),
Argument.of(String.class)
)
.getBody()
.orElseThrow();
private Path downloadArtifacts(RunContext runContext, Long runId, String path) throws IllegalVariableEvaluationException, IOException, HttpClientException {
HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder()
.uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + "/runs/" + runId + "/artifacts/" + path))
.method("GET");

Path tempFile = runContext.workingDir().createTempFile(".json");
String artifact = this.request(runContext, requestBuilder, String.class).getBody();

Path tempFile = runContext.workingDir().createTempFile(".json");
Files.writeString(tempFile, artifact, StandardOpenOption.TRUNCATE_EXISTING);

return tempFile;
Expand Down
39 changes: 17 additions & 22 deletions src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package io.kestra.plugin.dbt.cloud;

import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.dbt.cloud.models.RunResponse;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.uri.UriTemplate;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
Expand Down Expand Up @@ -163,23 +160,21 @@ public TriggerRun.Output run(RunContext runContext) throws Exception {
body.put("steps_override", runContext.render(this.stepsOverride).asList(String.class));
}

HttpResponse<RunResponse> triggerResponse = this.request(
runContext,
HttpRequest
.create(
HttpMethod.POST,
UriTemplate
.of("/api/v2/accounts/{accountId}/jobs/{jobId}/run")
.expand(Map.of(
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"jobId", runContext.render(this.jobId).as(String.class).orElseThrow()
)) + "/"
)
.body(body),
Argument.of(RunResponse.class)
);

RunResponse triggerRunResponse = triggerResponse.getBody().orElseThrow(() -> new IllegalStateException("Missing body on trigger"));
HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder()
.uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() +
"/jobs/" + runContext.render(this.jobId).as(String.class).orElseThrow() + "/run/"))
.method("POST")
.body(HttpRequest.JsonRequestBody.builder()
.content(body)
.build());

HttpResponse<RunResponse> triggerResponse = this.request(runContext, requestBuilder, RunResponse.class);

RunResponse triggerRunResponse = triggerResponse.getBody();
if (triggerRunResponse == null) {
throw new IllegalStateException("Missing body on trigger");
}

logger.info("Job status {} with response: {}", triggerResponse.getStatus(), triggerRunResponse);
Long runId = triggerRunResponse.getData().getId();

Expand Down

0 comments on commit 23f11c5

Please sign in to comment.