Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Update Jobs GetJob API to support paginated responses #403

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,45 @@ public Run getRun(GetRunRequest request) {

return run;
}

/**
* Wrap the {@code JobsApi.get} operation to retrieve paginated content without breaking the
* response contract.
*
* <p>Depending on the Jobs API version used under the hood, tasks or job_clusters retrieved by
* the initial request may be truncated due to high cardinalities. Truncation can happen for jobs
* with over 100 tasks, as well as job_clusters with over 100 elements. To avoid returning an
* incomplete {@code Job} object to the user, this method performs all the requests required to
* collect all tasks and job_clusters into a single {@code Job} object.
*/
public Job get(GetJobRequest request) {
Job job = super.get(request);

// jobs/get response includes next_page_token as long as there are more pages to fetch.
while (job.getNextPageToken() != null) {
request.setPageToken(job.getNextPageToken());
Job currJob = super.get(request);
// Each new page of jobs/get response includes the next page of the tasks, job_clusters,
// job_parameters, and environments.
Collection<Task> newTasks = currJob.getSettings().getTasks();
if (newTasks != null) {
job.getSettings().getTasks().addAll(newTasks);
}
Collection<JobCluster> newClusters = currJob.getSettings().getJobClusters();
if (newClusters != null) {
job.getSettings().getJobClusters().addAll(newClusters);
}
Collection<JobParameterDefinition> newParameters = currJob.getSettings().getParameters();
if (newParameters != null) {
job.getSettings().getParameters().addAll(newParameters);
}
Collection<JobEnvironment> newEnvironments = currJob.getSettings().getEnvironments();
if (newEnvironments != null) {
job.getSettings().getEnvironments().addAll(newEnvironments);
}
job.setNextPageToken(currJob.getNextPageToken());
}

return job;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.databricks.sdk.service.jobs.GetRunRequest;
import com.databricks.sdk.service.jobs.JobsService;
import com.databricks.sdk.service.jobs.Run;
import com.databricks.sdk.service.jobs.RunTask;
import com.databricks.sdk.service.jobs.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -82,4 +80,113 @@ private void addIterations(Run run, long... iterationRunIds) {
}
run.setIterations(iterations);
}

private void addJobClusters(Run run, String... clusterKeys) {
Collection<JobCluster> clusters = new ArrayList<>();
for (String clusterKey : clusterKeys) {
clusters.add(new JobCluster().setJobClusterKey(clusterKey));
}
run.setJobClusters(clusters);
}

private void addJobParameters(Run run, String... parameterKeys) {
Collection<JobParameter> parameters = new ArrayList<>();
for (String parameterKey : parameterKeys) {
parameters.add(new JobParameter().setName(parameterKey).setValue(parameterKey));
}
run.setJobParameters(parameters);
}

@Test
public void testGetJobPaginationWithTasks() {
JobsService service = Mockito.mock(JobsService.class);
Task taskKey1 = new Task().setTaskKey("taskKey1");
Task taskKey2 = new Task().setTaskKey("taskKey2");
List<Task> tasks = new ArrayList<>();
tasks.add(taskKey1);
tasks.add(taskKey2);

JobCluster jobClusterKey1 = new JobCluster().setJobClusterKey("jobClusterKey1");
List<JobCluster> jobClusters = new ArrayList<>();
jobClusters.add(jobClusterKey1);
Job firstPage =
new Job().setSettings(new JobSettings().setTasks(tasks).setJobClusters(jobClusters));

when(service.get(any())).thenReturn(firstPage);
JobsExt jobsExt = new JobsExt(service);

GetJobRequest request = new GetJobRequest();
Job job = jobsExt.get(request);

Job expectedJob =
new Job().setSettings(new JobSettings().setTasks(tasks).setJobClusters(jobClusters));
assertEquals(expectedJob, job);
verify(service, times(1)).get(any());
}

@Test
public void testGetJobPaginationWithJobClusters() {
JobsService service = Mockito.mock(JobsService.class);
Job firstPage = new Job().setSettings(new JobSettings()).setNextPageToken("tokenToSecondPage");
addTasks(firstPage, "taskKey1", "taskKey2");
addJobClusters(firstPage, "clusterKey1", "clusterKey2");
addJobParameters(firstPage, "parameterKey1");
addJobEnvironments(firstPage, "environmentKey1");

Job secondPage = new Job().setSettings(new JobSettings()).setNextPageToken("tokenToThirdPage");
addTasks(secondPage, "taskKey3", "taskKey4");
addJobClusters(secondPage, "clusterKey3");
addJobParameters(secondPage, "parameterKey2");
addJobEnvironments(secondPage, "environmentKey2");

Job thirdPage = new Job().setSettings(new JobSettings());
addTasks(thirdPage, "taskKey5");
addJobParameters(thirdPage, "parameterKey3");

when(service.get(any())).thenReturn(firstPage).thenReturn(secondPage).thenReturn(thirdPage);
JobsExt jobsExt = new JobsExt(service);

GetJobRequest request = new GetJobRequest();
Job job = jobsExt.get(request);

Job expectedJob = new Job().setSettings(new JobSettings());
addTasks(expectedJob, "taskKey1", "taskKey2", "taskKey3", "taskKey4", "taskKey5");
addJobClusters(expectedJob, "clusterKey1", "clusterKey2", "clusterKey3");
addJobParameters(expectedJob, "parameterKey1", "parameterKey2", "parameterKey3");
addJobEnvironments(expectedJob, "environmentKey1", "environmentKey2");
assertEquals(expectedJob, job);
verify(service, times(3)).get(any());
}

private void addTasks(Job job, String... taskKeys) {
Collection<Task> tasks = new ArrayList<>();
for (String taskKey : taskKeys) {
tasks.add(new Task().setTaskKey(taskKey));
}
job.getSettings().setTasks(tasks);
}

private void addJobClusters(Job job, String... clusterKeys) {
Collection<JobCluster> jobClusters = new ArrayList<>();
for (String clusterKey : clusterKeys) {
jobClusters.add(new JobCluster().setJobClusterKey(clusterKey));
}
job.getSettings().setJobClusters(jobClusters);
}

private void addJobParameters(Job job, String... parameterKeys) {
Collection<JobParameterDefinition> parameters = new ArrayList<>();
for (String parameterKey : parameterKeys) {
parameters.add(new JobParameterDefinition().setName(parameterKey));
}
job.getSettings().setParameters(parameters);
}

private void addJobEnvironments(Job job, String... environmentKeys) {
Collection<JobEnvironment> environments = new ArrayList<>();
for (String environmentKey : environmentKeys) {
environments.add(new JobEnvironment().setEnvironmentKey(environmentKey));
}
job.getSettings().setEnvironments(environments);
}
}
Loading