Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add Jobs API #1110

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.Job;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
Expand All @@ -44,6 +45,8 @@
import java.util.Map;
import java.util.function.Function;

import com.google.protobuf.Message;

/**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
*
Expand Down Expand Up @@ -670,6 +673,7 @@ Flux<SubscribeConfigurationResponse> subscribeConfiguration(String storeName, Li
* @return Mono of {@link UnsubscribeConfigurationResponse} instance.
*/
Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(UnsubscribeConfigurationRequest request);


/**
* Returns a newly created gRPC stub with proper interceptors and channel for gRPC proxy invocation.
Expand Down
100 changes: 100 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

package io.dapr.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Message;
import io.dapr.client.domain.ActorMetadata;
import io.dapr.client.domain.AppConnectionPropertiesHealthMetadata;
import io.dapr.client.domain.AppConnectionPropertiesMetadata;
Expand All @@ -38,6 +41,7 @@
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.Job;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryStateItem;
Expand All @@ -56,6 +60,7 @@
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
Expand Down Expand Up @@ -91,6 +96,7 @@
import reactor.util.retry.Retry;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -140,6 +146,10 @@ public class DaprClientImpl extends AbstractDaprClient {
private final DaprHttp httpClient;

private final DaprClientGrpcInterceptors grpcInterceptors;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final Charset CHARSET = Properties.STRING_CHARSET.get();

/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
Expand Down Expand Up @@ -1289,6 +1299,96 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
}
}

@Override
public <T> Mono<Void> scheduleJobAlpha1(Job<T> job) {
try {
final String name = job.getName();
final T data = job.getData();

if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Job name cannot be null or empty");
}
if (data == null) {
throw new IllegalArgumentException("Job data cannot be empty");
}

DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder()
.setName(name);
if (data instanceof String) {
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom((String) data, CHARSET)));
} else if (data instanceof byte[]) {
String base64 = OBJECT_MAPPER.writeValueAsString(data);
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET)));
} else {
return Mono.error(() -> {
throw new IllegalArgumentException("Job data value must be String or byte[]");
});
}
Copy link
Author

@charan2628 charan2628 Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artursouza @salaboy is this implementation fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artursouza the data of the Job can be something different than Message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 @salaboy and @artursouza I see that Job data is modeled as google.protobuf.Any. In other places where we use google.protobuf.Any like TransactionalActorStateOperation, we check that data is either String or byte[]. I think we should stick to this approach.

We don't want to expose com.google.protobuf.Message to DaprClient users, otherwise they will have too many options to create a Job, while having data as either String or byte[] covers most of the use cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@salaboy @artur-ciocanu is this implementation fine?

if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) {
jobBuilder.setSchedule(job.getSchedule());
}
if (job.getRepeats() != null) {
jobBuilder.setRepeats(job.getRepeats());
}
if (job.getDueTime() != null && !job.getDueTime().trim().isEmpty()) {
jobBuilder.setDueTime(job.getDueTime());
}
if (job.getTtl() != null && !job.getTtl().trim().isEmpty()) {
jobBuilder.setTtl(job.getTtl());
}

DaprProtos.ScheduleJobRequest.Builder builder =
DaprProtos.ScheduleJobRequest.newBuilder()
.setJob(jobBuilder.build());

return this.<DaprProtos.ScheduleJobResponse>createMono(
it -> intercept(null, asyncStub).scheduleJobAlpha1(builder.build(), it))
.then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

@SuppressWarnings("unchecked")
@Override
public <T> Mono<Job<T>> getJobAlpha1(String name, Class<T> clazz) {
try {
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Job name cannot be null or empty");
}
return this.<DaprProtos.GetJobResponse>createMono(
it -> intercept(null, asyncStub).getJobAlpha1(DaprProtos.GetJobRequest.newBuilder().setName(name).build(), it))
.map(it -> {
DaprProtos.Job _job = it.getJob();
T data = null;
if (clazz.isInstance(String.class)) {
data = (T)_job.getData().toByteString().toString(CHARSET);
} else if (clazz.isInstance(byte[].class)) {
data = (T) _job.getData().toByteArray();
} else {
throw new IllegalArgumentException("Job data type must be String or byte[]");
}
return new Job<>(_job.getName(), _job.getSchedule(), _job.getRepeats(), _job.getDueTime(), _job.getTtl(), data);
});
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@salaboy @artur-ciocanu this getJobAplha1 implementation is fine? I taking in the type of the data.

@Override
public Mono<Void> deleteJobAlpha1(String name) {
try {
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Job name cannot be null or empty");
}
return this.<DaprProtos.DeleteJobResponse>createMono(
it -> intercept(null, asyncStub).deleteJobAlpha1(DaprProtos.DeleteJobRequest.newBuilder().setName(name).build(), it))
.then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

/**
* Build a new Configuration Item from provided parameter.
*
Expand Down
27 changes: 27 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.Job;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
Expand Down Expand Up @@ -268,4 +269,30 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
*/
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);

/**
* ScheduleJobAlpha1 creates and schedules a job.
*
* @param <T> The type of the data for the job.
* @param job job to be scheduled
* @return a Mono plan of type Void.
*/
<T> Mono<Void> scheduleJobAlpha1(Job<T> job);

/**
* GetJobAlpha1 retrieve Job by name.
*
* @param <T> The type of the data for the job.
* @param name name of the job
* @return a Mono of Job
*/
<T> Mono<Job<T>> getJobAlpha1(String name, Class<T> clazz);

/**
* Delete a Job.
*
* @param name name of the job
* @return
*/
Mono<Void> deleteJobAlpha1(String name);
}
94 changes: 94 additions & 0 deletions sdk/src/main/java/io/dapr/client/domain/Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.dapr.client.domain;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not used, so you should probably remove it

import com.google.protobuf.Message;

/**
* A Job to schedule
*
* @param <T> The class type of Job data.
*/
public final class Job<T> {

private final String name;

private String schedule;

private Integer repeats;

private String dueTime;

private String ttl;

private final T data;

/**
* Constructor for Job
*
* @param name name of the job to create
*/
public Job(String name, T data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor should include all required fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 you are still missing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charan2628 ideally you should put all the data in constructor and have just "getters" without setters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artur-ciocanu as per the spec only name and data are required, remaining are optional that's why I included only these two in constructor, can I two constructors one with mandatory fields and another with all fields?

super();
this.name = name;
this.data = data;
}

/**
* Constructor for Job
*
* @param name name of the job to create
* @param schedule schedule for the job
* @param repeats jobs with fixed repeat counts (accounting for Actor Reminders).
* @param dueTime sets time at which or time interval before the callback is invoked for the first time.
* @param ttl Time To Live to allow for auto deletes (accounting for Actor Reminders).
* @param data Job data
*/
public Job(String name, String schedule, Integer repeats, String dueTime, String ttl, T data) {
super();
this.name = name;
this.schedule = schedule;
this.repeats = repeats;
this.dueTime = dueTime;
this.ttl = ttl;
this.data = data;
}

public String getSchedule() {
return schedule;
}

public void setSchedule(String schedule) {
this.schedule = schedule;
}

public Integer getRepeats() {
return repeats;
}

public void setRepeats(Integer repeats) {
this.repeats = repeats;
}

public String getDueTime() {
return dueTime;
}

public void setDueTime(String dueTime) {
this.dueTime = dueTime;
}

public String getTtl() {
return ttl;
}

public void setTtl(String ttl) {
this.ttl = ttl;
}

public T getData() {
return data;
}

public String getName() {
return name;
}
}