diff --git a/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java b/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java index a030c52..e0e8a74 100644 --- a/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java +++ b/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java @@ -1,30 +1,25 @@ package io.kestra.plugin.dbt.cloud; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.RunContext; -import io.micronaut.core.type.Argument; -import io.micronaut.http.HttpResponse; -import io.micronaut.http.MediaType; -import io.micronaut.http.MutableHttpRequest; -import io.micronaut.http.client.DefaultHttpClientConfiguration; -import io.micronaut.http.client.HttpClient; -import io.micronaut.http.client.exceptions.HttpClientResponseException; -import io.micronaut.http.client.netty.DefaultHttpClient; -import io.micronaut.http.client.netty.NettyHttpClientFactory; -import io.micronaut.http.codec.MediaTypeCodecRegistry; +import io.kestra.core.http.HttpRequest; +import io.kestra.core.http.HttpResponse; +import io.kestra.core.http.client.HttpClient; +import io.kestra.core.http.client.HttpClientException; +import io.kestra.core.http.client.configurations.HttpConfiguration; + import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.time.Duration; +import java.io.IOException; + import jakarta.validation.constraints.NotNull; -import reactor.core.publisher.Mono; @SuperBuilder @ToString @@ -32,67 +27,55 @@ @Getter @NoArgsConstructor public abstract class AbstractDbtCloud extends Task { - @Schema( - title = "Base url to select the tenant." - ) + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .registerModule(new JavaTimeModule()); + + @Schema(title = "Base URL to select the tenant.") @NotNull @Builder.Default Property baseUrl = Property.of("https://cloud.getdbt.com"); - @Schema( - title = "Numeric ID of the account." - ) + @Schema(title = "Numeric ID of the account.") @NotNull Property accountId; - @Schema( - title = "API key." - ) + @Schema(title = "API key.") @NotNull Property token; - private static final Duration HTTP_READ_TIMEOUT = Duration.ofSeconds(60); - private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory(); + @Schema(title = "The HTTP client configuration.") + HttpConfiguration options; - protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException { - MediaTypeCodecRegistry mediaTypeCodecRegistry = ((DefaultRunContext)runContext).getApplicationContext().getBean(MediaTypeCodecRegistry.class); + /** + * Perform an HTTP request using Kestra HttpClient. + * + * @param requestBuilder The prepared HTTP request builder. + * @param responseType The expected response type. + * @param The response class. + * @return HttpResponse of type RES. + */ + protected HttpResponse request(RunContext runContext, HttpRequest.HttpRequestBuilder requestBuilder, Class responseType) + throws HttpClientException, IllegalVariableEvaluationException { - var httpConfig = new DefaultHttpClientConfiguration(); - httpConfig.setMaxContentLength(Integer.MAX_VALUE); - httpConfig.setReadTimeout(HTTP_READ_TIMEOUT); + var request = requestBuilder + .addHeader("Authorization", "Bearer " + runContext.render(this.token).as(String.class).orElseThrow()) + .addHeader("Content-Type", "application/json") + .build(); - DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(runContext.render(baseUrl).as(String.class).orElseThrow()).toURL(), httpConfig); - client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry); + try (HttpClient client = new HttpClient(runContext, options)) { + HttpResponse response = client.request(request, String.class); - return client; - } - - protected HttpResponse request(RunContext runContext, - MutableHttpRequest request, - Argument argument) throws HttpClientResponseException { - return request(runContext, request, argument, null); - } - protected HttpResponse request(RunContext runContext, - MutableHttpRequest request, - Argument argument, - Duration timeout) throws HttpClientResponseException { - try { - request = request - .bearerAuth(runContext.render(this.token).as(String.class).orElseThrow()) - .contentType(MediaType.APPLICATION_JSON); + RES parsedResponse = MAPPER.readValue(response.getBody(), responseType); + return HttpResponse.builder() + .request(request) + .body(parsedResponse) + .headers(response.getHeaders()) + .status(response.getStatus()) + .build(); - try (HttpClient client = this.client(runContext)) { - Mono> mono = Mono.from(client.exchange(request, argument)); - return timeout != null ? mono.block(timeout) : mono.block(); - } - } catch (HttpClientResponseException e) { - throw new HttpClientResponseException( - "Request failed '" + e.getStatus().getCode() + "' and body '" + e.getResponse().getBody(String.class).orElse("null") + "'", - e, - e.getResponse() - ); - } catch (IllegalVariableEvaluationException | MalformedURLException | URISyntaxException e) { - throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException("Error executing HTTP request", e); } } } diff --git a/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java b/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java index 6cc868f..3a0c478 100644 --- a/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java +++ b/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java @@ -1,6 +1,8 @@ package io.kestra.plugin.dbt.cloud; import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.http.HttpRequest; +import io.kestra.core.http.client.HttpClientException; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.property.Property; @@ -11,10 +13,7 @@ import io.kestra.plugin.dbt.cloud.models.JobStatusHumanizedEnum; import io.kestra.plugin.dbt.cloud.models.RunResponse; import io.kestra.plugin.dbt.cloud.models.Step; -import io.micronaut.core.type.Argument; -import io.micronaut.http.HttpMethod; -import io.micronaut.http.HttpRequest; -import io.micronaut.http.uri.UriTemplate; + import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; import lombok.*; @@ -192,53 +191,23 @@ private void logSteps(Logger logger, RunResponse runResponse) { } } - private Optional fetchRunResponse(RunContext runContext, Long id, Boolean debug) throws IllegalVariableEvaluationException { - return this - .request( - runContext, - HttpRequest - .create( - HttpMethod.GET, - UriTemplate - .of("/api/v2/accounts/{accountId}/runs/{runId}/" + - "?include_related=" + URLEncoder.encode( - "[\"trigger\",\"job\"," + (debug ? "\"debug_logs\"" : "") + ",\"run_steps\", \"environment\"]", - StandardCharsets.UTF_8 - ) - ) - .expand(Map.of( - "accountId", runContext.render(this.accountId).as(String.class).orElseThrow(), - "runId", id - )) - ), - Argument.of(RunResponse.class), - runContext.render(this.maxDuration).as(Duration.class).orElseThrow() - ) - .getBody(); + private Optional fetchRunResponse(RunContext runContext, Long id, Boolean debug) throws IllegalVariableEvaluationException, HttpClientException { + HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder() + .uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + "/runs/" + id + + "/?include_related=" + URLEncoder.encode("[\"trigger\",\"job\"," + (debug ? "\"debug_logs\"" : "") + ",\"run_steps\", \"environment\"]", StandardCharsets.UTF_8))) + .method("GET"); + + return Optional.ofNullable(this.request(runContext, requestBuilder, RunResponse.class).getBody()); } - private Path downloadArtifacts(RunContext runContext, Long runId, String path) throws IllegalVariableEvaluationException, IOException { - String artifact = this - .request( - runContext, - HttpRequest - .create( - HttpMethod.GET, - UriTemplate - .of("/api/v2/accounts/{accountId}/runs/{runId}/artifacts/{path}") - .expand(Map.of( - "accountId", runContext.render(this.accountId).as(String.class).orElseThrow(), - "runId", runId, - "path", path - )) - ), - Argument.of(String.class) - ) - .getBody() - .orElseThrow(); + private Path downloadArtifacts(RunContext runContext, Long runId, String path) throws IllegalVariableEvaluationException, IOException, HttpClientException { + HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder() + .uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + "/runs/" + runId + "/artifacts/" + path)) + .method("GET"); - Path tempFile = runContext.workingDir().createTempFile(".json"); + String artifact = this.request(runContext, requestBuilder, String.class).getBody(); + Path tempFile = runContext.workingDir().createTempFile(".json"); Files.writeString(tempFile, artifact, StandardOpenOption.TRUNCATE_EXISTING); return tempFile; diff --git a/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java b/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java index 3d4735a..cfda4fa 100644 --- a/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java +++ b/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java @@ -1,16 +1,13 @@ package io.kestra.plugin.dbt.cloud; +import io.kestra.core.http.HttpRequest; +import io.kestra.core.http.HttpResponse; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.dbt.cloud.models.RunResponse; -import io.micronaut.core.type.Argument; -import io.micronaut.http.HttpMethod; -import io.micronaut.http.HttpRequest; -import io.micronaut.http.HttpResponse; -import io.micronaut.http.uri.UriTemplate; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; import lombok.Builder; @@ -163,23 +160,21 @@ public TriggerRun.Output run(RunContext runContext) throws Exception { body.put("steps_override", runContext.render(this.stepsOverride).asList(String.class)); } - HttpResponse triggerResponse = this.request( - runContext, - HttpRequest - .create( - HttpMethod.POST, - UriTemplate - .of("/api/v2/accounts/{accountId}/jobs/{jobId}/run") - .expand(Map.of( - "accountId", runContext.render(this.accountId).as(String.class).orElseThrow(), - "jobId", runContext.render(this.jobId).as(String.class).orElseThrow() - )) + "/" - ) - .body(body), - Argument.of(RunResponse.class) - ); - - RunResponse triggerRunResponse = triggerResponse.getBody().orElseThrow(() -> new IllegalStateException("Missing body on trigger")); + HttpRequest.HttpRequestBuilder requestBuilder = HttpRequest.builder() + .uri(URI.create(runContext.render(this.baseUrl).as(String.class).orElseThrow() + "/api/v2/accounts/" + runContext.render(this.accountId).as(String.class).orElseThrow() + + "/jobs/" + runContext.render(this.jobId).as(String.class).orElseThrow() + "/run/")) + .method("POST") + .body(HttpRequest.JsonRequestBody.builder() + .content(body) + .build()); + + HttpResponse triggerResponse = this.request(runContext, requestBuilder, RunResponse.class); + + RunResponse triggerRunResponse = triggerResponse.getBody(); + if (triggerRunResponse == null) { + throw new IllegalStateException("Missing body on trigger"); + } + logger.info("Job status {} with response: {}", triggerResponse.getStatus(), triggerRunResponse); Long runId = triggerRunResponse.getData().getId();