diff --git a/src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java b/src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java index 6896bfc..3110d71 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java @@ -27,10 +27,8 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.*; import java.util.List; -import java.util.Map; @SuperBuilder @ToString @@ -143,13 +141,13 @@ public void setDockerOptions(Property dockerOptions) { @Override public ScriptOutput run(RunContext runContext) throws Exception { CommandsWrapper commandsWrapper = new CommandsWrapper(runContext) - .withEnv(this.getEnv() != null ? this.getEnv().asMap(runContext, String.class, String.class) : new HashMap<>()) + .withEnv(runContext.render(this.getEnv()).asMap(String.class, String.class)) .withNamespaceFiles(namespaceFiles) .withInputFiles(inputFiles) .withOutputFiles(outputFiles) - .withRunnerType(this.getRunner() != null ? this.getRunner().as(runContext, RunnerType.class) : null) - .withDockerOptions(this.getDocker() != null ? this.getDocker().as(runContext, DockerOptions.class) : null) - .withContainerImage(this.containerImage.as(runContext, String.class)) + .withRunnerType(runContext.render(this.getRunner()).as(RunnerType.class).orElse(null)) + .withDockerOptions(runContext.render(this.getDocker()).as(DockerOptions.class).orElse(null)) + .withContainerImage(runContext.render(this.getContainerImage()).as(String.class).orElseThrow()) .withTaskRunner(this.taskRunner) .withLogConsumer(new AbstractLogConsumer() { @Override @@ -160,15 +158,15 @@ public void accept(String line, Boolean isStdErr) { .withEnableOutputDirectory(true); //force output files on task runners Path workingDirectory = commandsWrapper.getWorkingDirectory(); - String profileString = profiles != null ? profiles.as(runContext, String.class) : null; - if (profileString != null && !profileString.isEmpty()) { + Optional profileString = runContext.render(profiles).as(String.class); + if (profileString.isPresent() && !profileString.get().isEmpty()) { if (Files.exists(Path.of(".profiles/profiles.yml"))) { runContext.logger().warn("A 'profiles.yml' file already exist in the task working directory, it will be overridden."); } FileUtils.writeStringToFile( new File(workingDirectory.resolve(".profile").toString(), "profiles.yml"), - profileString, + profileString.get(), StandardCharsets.UTF_8 ); } @@ -194,26 +192,26 @@ public void accept(String line, Boolean isStdErr) { private String createDbtCommand(RunContext runContext) throws IllegalVariableEvaluationException { List commands = new ArrayList<>(List.of( - dbtPath.as(runContext, String.class), + runContext.render(this.dbtPath).as(String.class).orElseThrow(), "--log-format json" )); - if (Boolean.TRUE.equals(this.debug.as(runContext, Boolean.class))) { + if (Boolean.TRUE.equals(runContext.render(this.debug).as(Boolean.class).orElse(false))) { commands.add("--debug"); } - if (Boolean.TRUE.equals(this.failFast.as(runContext, Boolean.class))) { + if (Boolean.TRUE.equals(runContext.render(this.failFast).as(Boolean.class).orElse(false))) { commands.add("--fail-fast"); } - if (Boolean.TRUE.equals(this.warnError.as(runContext, Boolean.class))) { + if (Boolean.TRUE.equals(runContext.render(this.warnError).as(Boolean.class).orElse(false))) { commands.add("--warn-error"); } commands.addAll(dbtCommands(runContext)); - if (this.projectDir != null) { - commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + this.projectDir.as(runContext, String.class)); + if (runContext.render(this.projectDir).as(String.class).isPresent()) { + commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + runContext.render(this.projectDir).as(String.class).get()); } else { commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}"); } @@ -222,11 +220,11 @@ private String createDbtCommand(RunContext runContext) throws IllegalVariableEva } protected void parseResults(RunContext runContext, Path workingDirectory, ScriptOutput scriptOutput) throws IllegalVariableEvaluationException, IOException { - String baseDir = this.projectDir != null ? this.projectDir.as(runContext, String.class) : ""; + String baseDir = runContext.render(this.projectDir).as(String.class).orElse(""); File runResults = workingDirectory.resolve(baseDir + "target/run_results.json").toFile(); - if (this.parseRunResults.as(runContext, Boolean.class) && runResults.exists()) { + if (runContext.render(this.parseRunResults).as(Boolean.class).orElse(true) && runResults.exists()) { URI results = ResultParser.parseRunResult(runContext, runResults); scriptOutput.getOutputFiles().put("run_results.json", results); } diff --git a/src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java b/src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java index 11d09e4..6768485 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java @@ -57,28 +57,28 @@ protected java.util.List dbtCommands(RunContext runContext) throws Illeg this.dbtCommand(), "--profiles-dir {{" + ScriptService.VAR_WORKING_DIR + "}}/.profile")); - if (this.thread != null) { - commands.add("--threads " + this.thread); + if (runContext.render(this.thread).as(Integer.class).isPresent()) { + commands.add("--threads " + runContext.render(this.thread).as(Integer.class).get()); } - if (this.fullRefresh.as(runContext, Boolean.class)) { + if (runContext.render(this.fullRefresh).as(Boolean.class).orElse(false)) { commands.add("--full-refresh"); } - if (this.target != null) { - commands.add("--target " + this.target.as(runContext, String.class)); + if (runContext.render(this.target).as(String.class).isPresent()) { + commands.add("--target " + runContext.render(this.target).as(String.class).get()); } - if (this.selector != null) { - commands.add("--selector " + this.selector.as(runContext, String.class)); + if (runContext.render(this.selector).as(String.class).isPresent()) { + commands.add("--selector " + runContext.render(this.target).as(String.class).get()); } - if (this.select != null) { - commands.add("--select " + String.join(" ", this.select.asList(runContext, String.class))); + if (!runContext.render(this.select).asList(String.class).isEmpty()) { + commands.add("--select " + String.join(" ", runContext.render(this.select).asList(String.class))); } - if (this.exclude != null) { - commands.add("--exclude " + String.join(" ", this.exclude.asList(runContext, String.class))); + if (!runContext.render(this.exclude).asList(String.class).isEmpty()) { + commands.add("--exclude " + String.join(" ", runContext.render(this.exclude).asList(String.class))); } return commands; diff --git a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java index e7376b3..eec6e40 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java @@ -146,7 +146,7 @@ taskRunner: type: io.kestra.plugin.scripts.runner.docker.Docker memory: - memory: 1GB + memory: 1GB containerImage: python:3.11-slim beforeCommands: - pip install uv @@ -173,7 +173,7 @@ code = """ id: dwh_and_analytics namespace: company.team - + tasks: - id: dbt type: io.kestra.plugin.core.flow.WorkingDirectory @@ -182,7 +182,7 @@ type: io.kestra.plugin.git.Clone url: https://github.com/kestra-io/dbt-example branch: master - + - id: dbt_build type: io.kestra.plugin.dbt.cli.DbtCLI taskRunner: @@ -236,7 +236,7 @@ type: io.kestra.plugin.git.Clone url: https://github.com/kestra-io/dbt-example branch: master - + - id: dbt_build type: io.kestra.plugin.dbt.cli.DbtCLI taskRunner: @@ -360,7 +360,7 @@ public ScriptOutput run(RunContext runContext) throws Exception { //Check/fail if a KV store exists with given namespace if(this.getStoreManifest() != null) { - storeManifestKvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class)); + storeManifestKvStore = runContext.namespaceKv(runContext.render(this.getStoreManifest().getNamespace()).as(String.class).orElseThrow()); } CommandsWrapper commands = this.commands(runContext) @@ -372,16 +372,17 @@ public void accept(String line, Boolean isStdErr) { } }); - Path projectWorkingDirectory = projectDir == null ? commands.getWorkingDirectory() : commands.getWorkingDirectory().resolve(projectDir.as(runContext, String.class)); + var renderedProjectDir = runContext.render(projectDir).as(String.class); + Path projectWorkingDirectory = renderedProjectDir.map(s -> commands.getWorkingDirectory().resolve(s)).orElseGet(commands::getWorkingDirectory); //Load manifest from KV store if(this.getLoadManifest() != null) { - KVStore loadManifestKvStore = runContext.namespaceKv(this.getLoadManifest().getNamespace().as(runContext, String.class)); + KVStore loadManifestKvStore = runContext.namespaceKv(runContext.render(this.getLoadManifest().getNamespace()).as(String.class).orElseThrow()); fetchAndStoreManifestIfExists(runContext, loadManifestKvStore, projectWorkingDirectory); } //Create profiles.yml - String profilesString = profiles == null ? null : profiles.as(runContext, String.class); + String profilesString = runContext.render(profiles).as(String.class).orElse(null); if (profilesString != null && !profilesString.isEmpty()) { var profileFile = new File(commands.getWorkingDirectory().toString(), "profiles.yml"); if (profileFile.exists()) { @@ -416,15 +417,15 @@ public void accept(String line, Boolean isStdErr) { .run(); //Parse run results - if (this.parseRunResults.as(runContext, Boolean.class) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) { + if (runContext.render(this.parseRunResults).as(Boolean.class).orElse(Boolean.TRUE) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) { URI results = ResultParser.parseRunResult(runContext, projectWorkingDirectory.resolve("target/run_results.json").toFile()); run.getOutputFiles().put("run_results.json", results); } File manifestFile = projectWorkingDirectory.resolve("target/manifest.json").toFile(); if (manifestFile.exists()) { - if(this.getStoreManifest() != null) { - final String key = this.getStoreManifest().getKey().as(runContext, String.class); + if(runContext.render(this.getStoreManifest().getKey()).as(String.class).isPresent()) { + final String key = runContext.render(this.getStoreManifest().getKey()).as(String.class).get(); storeManifestKvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath())))); } @@ -437,7 +438,7 @@ public void accept(String line, Boolean isStdErr) { } private void fetchAndStoreManifestIfExists(RunContext runContext, KVStore loadManifestKvStore, Path projectWorkingDirectory) throws IOException, ResourceExpiredException, IllegalVariableEvaluationException { - Optional manifestValue = loadManifestKvStore.getValue(this.getLoadManifest().getKey().as(runContext, String.class)); + Optional manifestValue = loadManifestKvStore.getValue(runContext.render(this.getLoadManifest().getKey()).as(String.class).get()); if(manifestValue.isEmpty() || manifestValue.get().value() == null || StringUtils.isBlank(manifestValue.get().value().toString())) { runContext.logger().warn("Property `loadManifest` has been used but no manifest has been found in the KV Store."); diff --git a/src/main/java/io/kestra/plugin/dbt/cli/Setup.java b/src/main/java/io/kestra/plugin/dbt/cli/Setup.java index bd819d1..08f0d5c 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/Setup.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/Setup.java @@ -69,7 +69,7 @@ type: io.kestra.plugin.git.Clone url: https://github.com/kestra-io/dbt-demo branch: main - + - id: dbt_setup type: io.kestra.plugin.dbt.cli.Setup requirements: @@ -83,7 +83,7 @@ extensions: - parquet target: dev - + - id: dbt_build type: io.kestra.plugin.dbt.cli.Build """ @@ -174,14 +174,14 @@ public ScriptOutput run(RunContext runContext) throws Exception { CommandsWrapper commandsWrapper = this.commands(runContext); Path workingDirectory = commandsWrapper.getWorkingDirectory(); - List commands = this.virtualEnvCommand(runContext, workingDirectory, this.requirements.asList(runContext, String.class)); + List commands = this.virtualEnvCommand(runContext, workingDirectory, runContext.render(this.requirements).asList(String.class)); // write profile File profileDir = workingDirectory.resolve(".profile").toFile(); // noinspection ResultOfMethodCallIgnored profileDir.mkdirs(); - String profilesContent = profilesContent(runContext, profiles.as(runContext, Object.class)); + String profilesContent = profilesContent(runContext, runContext.render(this.profiles).as(Object.class).orElseThrow()); FileUtils.writeStringToFile( new File(profileDir, "profiles.yml"), profilesContent, @@ -235,6 +235,8 @@ private String profilesContent(RunContext runContext, Object profiles) throws Il } private Map finalInputFiles(RunContext runContext) throws IOException, IllegalVariableEvaluationException { - return this.inputFiles != null ? new HashMap<>(PluginUtilsService.transformInputFiles(runContext, this.inputFiles.as(runContext, Object.class))) : new HashMap<>(); + return runContext.render(this.inputFiles).as(Object.class).isPresent() ? + new HashMap<>(PluginUtilsService.transformInputFiles(runContext, runContext.render(this.inputFiles).as(Object.class).orElseThrow())) : + new HashMap<>(); } } diff --git a/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java b/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java index d182238..a030c52 100644 --- a/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java +++ b/src/main/java/io/kestra/plugin/dbt/cloud/AbstractDbtCloud.java @@ -61,7 +61,7 @@ protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluat httpConfig.setMaxContentLength(Integer.MAX_VALUE); httpConfig.setReadTimeout(HTTP_READ_TIMEOUT); - DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(baseUrl.as(runContext, String.class)).toURL(), httpConfig); + DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(runContext.render(baseUrl).as(String.class).orElseThrow()).toURL(), httpConfig); client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry); return client; @@ -78,7 +78,7 @@ protected HttpResponse request(RunContext runContext, Duration timeout) throws HttpClientResponseException { try { request = request - .bearerAuth(this.token.as(runContext, String.class)) + .bearerAuth(runContext.render(this.token).as(String.class).orElseThrow()) .contentType(MediaType.APPLICATION_JSON); try (HttpClient client = this.client(runContext)) { diff --git a/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java b/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java index 50e4dbe..6cc868f 100644 --- a/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java +++ b/src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java @@ -106,7 +106,7 @@ public CheckStatus.Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); // Check rendered runId provided is an Integer - Long runIdRendered = Long.parseLong(this.runId.as(runContext, String.class)); + Long runIdRendered = Long.parseLong(runContext.render(this.runId).as(String.class).orElseThrow()); // wait for end RunResponse finalRunResponse = Await.until( @@ -139,8 +139,8 @@ public CheckStatus.Output run(RunContext runContext) throws Exception { return null; }), - this.pollFrequency.as(runContext, Duration.class), - this.maxDuration.as(runContext, Duration.class) + runContext.render(this.pollFrequency).as(Duration.class).orElseThrow(), + runContext.render(this.maxDuration).as(Duration.class).orElseThrow() ); // final response @@ -155,7 +155,7 @@ public CheckStatus.Output run(RunContext runContext) throws Exception { Path runResultsArtifact = downloadArtifacts(runContext, runIdRendered, "run_results.json"); Path manifestArtifact = downloadArtifacts(runContext, runIdRendered, "manifest.json"); - if (this.parseRunResults.as(runContext, Boolean.class)) { + if (runContext.render(this.parseRunResults).as(Boolean.class).orElseThrow()) { ResultParser.parseRunResult(runContext, runResultsArtifact.toFile()); } @@ -207,12 +207,12 @@ private Optional fetchRunResponse(RunContext runContext, Long id, B ) ) .expand(Map.of( - "accountId", this.accountId.as(runContext, String.class), + "accountId", runContext.render(this.accountId).as(String.class).orElseThrow(), "runId", id )) ), Argument.of(RunResponse.class), - maxDuration.as(runContext, Duration.class) + runContext.render(this.maxDuration).as(Duration.class).orElseThrow() ) .getBody(); } @@ -227,7 +227,7 @@ private Path downloadArtifacts(RunContext runContext, Long runId, String path) t UriTemplate .of("/api/v2/accounts/{accountId}/runs/{runId}/artifacts/{path}") .expand(Map.of( - "accountId", this.accountId.as(runContext, String.class), + "accountId", runContext.render(this.accountId).as(String.class).orElseThrow(), "runId", runId, "path", path )) diff --git a/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java b/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java index 727d949..3d4735a 100644 --- a/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java +++ b/src/main/java/io/kestra/plugin/dbt/cloud/TriggerRun.java @@ -148,46 +148,19 @@ public TriggerRun.Output run(RunContext runContext) throws Exception { // trigger Map body = new HashMap<>(); - body.put("cause", this.cause.as(runContext, String.class)); - - if (this.gitSha != null) { - body.put("git_sha", this.gitSha.as(runContext, String.class)); - } - - if (this.gitBranch != null) { - body.put("git_branch", this.gitBranch.as(runContext, String.class)); - } - - if (this.schemaOverride != null) { - body.put("schema_override", this.schemaOverride.as(runContext, String.class)); - } - - if (this.dbtVersionOverride != null) { - body.put("dbt_version_override", this.dbtVersionOverride.as(runContext, String.class)); - } - - if (this.threadsOverride != null) { - body.put("threads_override", this.threadsOverride.as(runContext, String.class)); - } - - if (this.targetNameOverride != null) { - body.put("target_name_override", this.targetNameOverride.as(runContext, String.class)); - } - - if (this.targetNameOverride != null) { - body.put("target_name_override", this.targetNameOverride.as(runContext, String.class)); - } - - if (this.generateDocsOverride != null) { - body.put("generate_docs_override", this.generateDocsOverride.as(runContext, Boolean.class)); - } - - if (this.timeoutSecondsOverride != null) { - body.put("timeout_seconds_override", this.timeoutSecondsOverride.as(runContext, Integer.class)); - } - - if (this.stepsOverride != null) { - body.put("steps_override", this.stepsOverride.asList(runContext, String.class)); + body.put("cause", runContext.render(this.cause).as(String.class).orElseThrow()); + + runContext.render(this.gitSha).as(String.class).ifPresent(sha -> body.put("git_sha", sha)); + runContext.render(this.gitBranch).as(String.class).ifPresent(branch -> body.put("git_branch", branch)); + runContext.render(this.schemaOverride).as(String.class).ifPresent(schema -> body.put("schema_override", schema)); + runContext.render(this.dbtVersionOverride).as(String.class).ifPresent(version -> body.put("dbt_version_override", version)); + runContext.render(this.threadsOverride).as(String.class).ifPresent(thread -> body.put("threads_override", thread)); + runContext.render(this.targetNameOverride).as(String.class).ifPresent(target -> body.put("target_name_override", target)); + runContext.render(this.generateDocsOverride).as(Boolean.class).ifPresent(doc -> body.put("generate_docs_override", doc)); + runContext.render(this.timeoutSecondsOverride).as(Integer.class).ifPresent(timeout -> body.put("timeout_seconds_override", timeout)); + + if (!runContext.render(this.stepsOverride).asList(String.class).isEmpty()) { + body.put("steps_override", runContext.render(this.stepsOverride).asList(String.class)); } HttpResponse triggerResponse = this.request( @@ -198,8 +171,8 @@ public TriggerRun.Output run(RunContext runContext) throws Exception { UriTemplate .of("/api/v2/accounts/{accountId}/jobs/{jobId}/run") .expand(Map.of( - "accountId", this.accountId.as(runContext, String.class), - "jobId", this.jobId.as(runContext, String.class) + "accountId", runContext.render(this.accountId).as(String.class).orElseThrow(), + "jobId", runContext.render(this.jobId).as(String.class).orElseThrow() )) + "/" ) .body(body), @@ -210,7 +183,7 @@ public TriggerRun.Output run(RunContext runContext) throws Exception { logger.info("Job status {} with response: {}", triggerResponse.getStatus(), triggerRunResponse); Long runId = triggerRunResponse.getData().getId(); - if (!this.wait.as(runContext, Boolean.class)) { + if (Boolean.FALSE.equals(runContext.render(this.wait).as(Boolean.class).orElse(Boolean.TRUE))) { return Output.builder() .runId(runId) .build();