Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add Jobs API #1110

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.Job;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
Expand All @@ -44,6 +45,8 @@
import java.util.Map;
import java.util.function.Function;

import com.google.protobuf.Message;

/**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
*
Expand Down Expand Up @@ -662,6 +665,15 @@ Flux<SubscribeConfigurationResponse> subscribeConfiguration(String storeName, Li
* @return Mono of {@link UnsubscribeConfigurationResponse} instance.
*/
Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(UnsubscribeConfigurationRequest request);

/**
* ScheduleJobAlpha1 creates and schedules a job.
*
* @param <T> The type of the data for the job.
* @param job job to be scheduled
* @return a Mono plan of type Void.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 since this is still early days, we should move it to DaprPreviewClient where we host all the "experimental" APIs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this @artur-ciocanu

<T> Mono<Void> scheduleJobAlpha1(Job<T> job);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good based on this docs: https://docs.dapr.io/reference/api/jobs_api/


/**
* Returns a newly created gRPC stub with proper interceptors and channel for gRPC proxy invocation.
Expand Down
48 changes: 48 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.dapr.client;

import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Message;
import io.dapr.client.domain.ActorMetadata;
import io.dapr.client.domain.AppConnectionPropertiesHealthMetadata;
import io.dapr.client.domain.AppConnectionPropertiesMetadata;
Expand All @@ -37,6 +39,7 @@
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.Job;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryStateItem;
Expand Down Expand Up @@ -1175,6 +1178,51 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
}
}

@Override
public <T> Mono<Void> scheduleJobAlpha1(Job<T> job) {
try {
final String name = job.getName();
final T data = job.getData();

if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Job name cannot be null or empty");
}
if (data == null) {
throw new IllegalArgumentException("Job data cannot be empty");
}

DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder()
.setName(name);
if (data instanceof Message) {
jobBuilder.setData(Any.pack((Message)job.getData()));
} else {
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(this.objectSerializer.serialize(data))));
}
Copy link
Author

@charan2628 charan2628 Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artursouza @salaboy is this implementation fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artursouza the data of the Job can be something different than Message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 @salaboy and @artursouza I see that Job data is modeled as google.protobuf.Any. In other places where we use google.protobuf.Any like TransactionalActorStateOperation, we check that data is either String or byte[]. I think we should stick to this approach.

We don't want to expose com.google.protobuf.Message to DaprClient users, otherwise they will have too many options to create a Job, while having data as either String or byte[] covers most of the use cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@salaboy @artur-ciocanu is this implementation fine?

if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) {
jobBuilder.setSchedule(job.getSchedule());
}
if (job.getRepeats() != null) {
jobBuilder.setRepeats(job.getRepeats());
}
if (job.getDueTime() != null && !job.getDueTime().trim().isEmpty()) {
jobBuilder.setDueTime(job.getDueTime());
}
if (job.getTtl() != null && !job.getTtl().trim().isEmpty()) {
jobBuilder.setTtl(job.getTtl());
}

DaprProtos.ScheduleJobRequest.Builder builder =
DaprProtos.ScheduleJobRequest.newBuilder()
.setJob(jobBuilder.build());

return this.<DaprProtos.ScheduleJobResponse>createMono(
it -> intercept(null, asyncStub).scheduleJobAlpha1(builder.build(), it))
.then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

/**
* Build a new Configuration Item from provided parameter.
*
Expand Down
74 changes: 74 additions & 0 deletions sdk/src/main/java/io/dapr/client/domain/Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.dapr.client.domain;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not used, so you should probably remove it

import com.google.protobuf.Message;

/**
* A Job to schedule
*
* @param <T> The class type of Job data.
*/
public final class Job<T> {

private final String name;

private String schedule;

private Integer repeats;

private String dueTime;

private String ttl;

private final T data;

/**
* Constructor for Job
*
* @param name name of the job to create
*/
public Job(String name, T data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor should include all required fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 you are still missing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 ideally you should put all the data in constructor and have just "getters" without setters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artur-ciocanu as per the spec only name and data are required, remaining are optional that's why I included only these two in constructor, can I two constructors one with mandatory fields and another with all fields?

super();
this.name = name;
this.data = data;
}

public String getSchedule() {
return schedule;
}

public void setSchedule(String schedule) {
this.schedule = schedule;
}

public Integer getRepeats() {
return repeats;
}

public void setRepeats(Integer repeats) {
this.repeats = repeats;
}

public String getDueTime() {
return dueTime;
}

public void setDueTime(String dueTime) {
this.dueTime = dueTime;
}

public String getTtl() {
return ttl;
}

public void setTtl(String ttl) {
this.ttl = ttl;
}

public T getData() {
return data;
}

public String getName() {
return name;
}
}