Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to the kestra http client instead of micronaut #176

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 45 additions & 62 deletions src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java
Original file line number Diff line number Diff line change
@@ -1,98 +1,81 @@
package io.kestra.plugin.dbt.cloud;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.DefaultHttpClientConfiguration;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import io.micronaut.http.client.netty.NettyHttpClientFactory;
import io.micronaut.http.codec.MediaTypeCodecRegistry;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.http.client.configurations.HttpConfiguration;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.io.IOException;

import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Mono;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractDbtCloud extends Task {
@Schema(
title = "Base url to select the tenant."
)
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModule(new JavaTimeModule());

@Schema(title = "Base URL to select the tenant.")
@NotNull
@Builder.Default
Property<String> baseUrl = Property.of("https://cloud.getdbt.com");

@Schema(
title = "Numeric ID of the account."
)
@Schema(title = "Numeric ID of the account.")
@NotNull
Property<String> accountId;

@Schema(
title = "API key."
)
@Schema(title = "API key.")
@NotNull
Property<String> token;

private static final Duration HTTP_READ_TIMEOUT = Duration.ofSeconds(60);
private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory();
@Schema(title = "The HTTP client configuration.")
HttpConfiguration options;

protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
MediaTypeCodecRegistry mediaTypeCodecRegistry = ((DefaultRunContext)runContext).getApplicationContext().getBean(MediaTypeCodecRegistry.class);
/**
* Perform an HTTP request using Kestra HttpClient.
*
* @param requestBuilder The prepared HTTP request builder.
* @param responseType The expected response type.
* @param <RES> The response class.
* @return HttpResponse of type RES.
*/
protected <RES> HttpResponse<RES> request(RunContext runContext, HttpRequest.HttpRequestBuilder requestBuilder, Class<RES> responseType)
throws HttpClientException, IllegalVariableEvaluationException {

var httpConfig = new DefaultHttpClientConfiguration();
httpConfig.setMaxContentLength(Integer.MAX_VALUE);
httpConfig.setReadTimeout(HTTP_READ_TIMEOUT);
var request = requestBuilder
.addHeader("Authorization", "Bearer " + runContext.render(this.token).as(String.class).orElseThrow())
.addHeader("Content-Type", "application/json")
.build();

DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(runContext.render(baseUrl).as(String.class).orElseThrow()).toURL(), httpConfig);
client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry);
try (HttpClient client = new HttpClient(runContext, options)) {
HttpResponse<String> response = client.request(request, String.class);

return client;
}

protected <REQ, RES> HttpResponse<RES> request(RunContext runContext,
MutableHttpRequest<REQ> request,
Argument<RES> argument) throws HttpClientResponseException {
return request(runContext, request, argument, null);
}
protected <REQ, RES> HttpResponse<RES> request(RunContext runContext,
MutableHttpRequest<REQ> request,
Argument<RES> argument,
Duration timeout) throws HttpClientResponseException {
try {
request = request
.bearerAuth(runContext.render(this.token).as(String.class).orElseThrow())
.contentType(MediaType.APPLICATION_JSON);
RES parsedResponse = MAPPER.readValue(response.getBody(), responseType);
return HttpResponse.<RES>builder()
.request(request)
.body(parsedResponse)
.headers(response.getHeaders())
.status(response.getStatus())
.build();

try (HttpClient client = this.client(runContext)) {
Mono<HttpResponse<RES>> mono = Mono.from(client.exchange(request, argument));
return timeout != null ? mono.block(timeout) : mono.block();
}
} catch (HttpClientResponseException e) {
throw new HttpClientResponseException(
"Request failed '" + e.getStatus().getCode() + "' and body '" + e.getResponse().getBody(String.class).orElse("null") + "'",
e,
e.getResponse()
);
} catch (IllegalVariableEvaluationException | MalformedURLException | URISyntaxException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException("Error executing HTTP request", e);
}
}
}
63 changes: 16 additions & 47 deletions src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.plugin.dbt.cloud;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
Expand All @@ -11,10 +13,7 @@
import io.kestra.plugin.dbt.cloud.models.JobStatusHumanizedEnum;
import io.kestra.plugin.dbt.cloud.models.RunResponse;
import io.kestra.plugin.dbt.cloud.models.Step;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.uri.UriTemplate;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
Expand Down Expand Up @@ -192,53 +191,23 @@ private void logSteps(Logger logger, RunResponse runResponse) {
}
}

private Optional<RunResponse> fetchRunResponse(RunContext runContext, Long id, Boolean debug) throws IllegalVariableEvaluationException {
return this
.request(
runContext,
HttpRequest
.create(
HttpMethod.GET,
UriTemplate
.of("/api/v2/accounts/{accountId}/runs/{runId}/" +
"?include_related=" + URLEncoder.encode(
"[\"trigger\",\"job\"," + (debug ? "\"debug_logs\"" : "") + ",\"run_steps\", \"environment\"]",
StandardCharsets.UTF_8
)
)
.expand(Map.of(
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", id
))
),
Argument.of(RunResponse.class),
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
)
.getBody();
private Optional<RunResponse> fetchRunResponse(RunContext runContext, Long id, Boolean debug) throws IllegalVariableEvaluationException, HttpClientException {
HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder()
.uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + "/runs/" + id +
"/?include_related=" + URLEncoder.encode("[\"trigger\",\"job\"," + (debug ? "\"debug_logs\"" : "") + ",\"run_steps\", \"environment\"]", StandardCharsets.UTF_8)))
.method("GET");

return Optional.ofNullable(this.request(runContext, requestBuilder, RunResponse.class).getBody());
}

private Path downloadArtifacts(RunContext runContext, Long runId, String path) throws IllegalVariableEvaluationException, IOException {
String artifact = this
.request(
runContext,
HttpRequest
.create(
HttpMethod.GET,
UriTemplate
.of("/api/v2/accounts/{accountId}/runs/{runId}/artifacts/{path}")
.expand(Map.of(
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", runId,
"path", path
))
),
Argument.of(String.class)
)
.getBody()
.orElseThrow();
private Path downloadArtifacts(RunContext runContext, Long runId, String path) throws IllegalVariableEvaluationException, IOException, HttpClientException {
HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder()
.uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + "/runs/" + runId + "/artifacts/" + path))
.method("GET");

Path tempFile = runContext.workingDir().createTempFile(".json");
String artifact = this.request(runContext, requestBuilder, String.class).getBody();

Path tempFile = runContext.workingDir().createTempFile(".json");
Files.writeString(tempFile, artifact, StandardOpenOption.TRUNCATE_EXISTING);

return tempFile;
Expand Down
39 changes: 17 additions & 22 deletions src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package io.kestra.plugin.dbt.cloud;

import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.dbt.cloud.models.RunResponse;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.uri.UriTemplate;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
Expand Down Expand Up @@ -163,23 +160,21 @@ public TriggerRun.Output run(RunContext runContext) throws Exception {
body.put("steps_override", runContext.render(this.stepsOverride).asList(String.class));
}

HttpResponse<RunResponse> triggerResponse = this.request(
runContext,
HttpRequest
.create(
HttpMethod.POST,
UriTemplate
.of("/api/v2/accounts/{accountId}/jobs/{jobId}/run")
.expand(Map.of(
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"jobId", runContext.render(this.jobId).as(String.class).orElseThrow()
)) + "/"
)
.body(body),
Argument.of(RunResponse.class)
);

RunResponse triggerRunResponse = triggerResponse.getBody().orElseThrow(() -> new IllegalStateException("Missing body on trigger"));
HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder()
.uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() +
"/jobs/" + runContext.render(this.jobId).as(String.class).orElseThrow() + "/run/"))
.method("POST")
.body(HttpRequest.JsonRequestBody.builder()
.content(body)
.build());

HttpResponse<RunResponse> triggerResponse = this.request(runContext, requestBuilder, RunResponse.class);

RunResponse triggerRunResponse = triggerResponse.getBody();
if (triggerRunResponse == null) {
throw new IllegalStateException("Missing body on trigger");
}

logger.info("Job status {} with response: {}", triggerResponse.getStatus(), triggerRunResponse);
Long runId = triggerRunResponse.getData().getId();

Expand Down