-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add Jobs API #1110
base: master
Are you sure you want to change the base?
WIP: Add Jobs API #1110
Changes from all commits
5628ef5
c4541e0
d8fb6aa
591e58d
4c208b4
a966fbd
9747273
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,12 @@ | |
|
||
package io.dapr.client; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.common.base.Strings; | ||
import com.google.protobuf.Any; | ||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.Empty; | ||
import com.google.protobuf.Message; | ||
import io.dapr.client.domain.ActorMetadata; | ||
import io.dapr.client.domain.AppConnectionPropertiesHealthMetadata; | ||
import io.dapr.client.domain.AppConnectionPropertiesMetadata; | ||
|
@@ -38,6 +41,7 @@ | |
import io.dapr.client.domain.HttpExtension; | ||
import io.dapr.client.domain.InvokeBindingRequest; | ||
import io.dapr.client.domain.InvokeMethodRequest; | ||
import io.dapr.client.domain.Job; | ||
import io.dapr.client.domain.LockRequest; | ||
import io.dapr.client.domain.PublishEventRequest; | ||
import io.dapr.client.domain.QueryStateItem; | ||
|
@@ -56,6 +60,7 @@ | |
import io.dapr.client.domain.UnsubscribeConfigurationRequest; | ||
import io.dapr.client.domain.UnsubscribeConfigurationResponse; | ||
import io.dapr.client.resiliency.ResiliencyOptions; | ||
import io.dapr.config.Properties; | ||
import io.dapr.exceptions.DaprException; | ||
import io.dapr.internal.exceptions.DaprHttpException; | ||
import io.dapr.internal.grpc.DaprClientGrpcInterceptors; | ||
|
@@ -91,6 +96,7 @@ | |
import reactor.util.retry.Retry; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
|
@@ -140,6 +146,10 @@ public class DaprClientImpl extends AbstractDaprClient { | |
private final DaprHttp httpClient; | ||
|
||
private final DaprClientGrpcInterceptors grpcInterceptors; | ||
|
||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
|
||
private static final Charset CHARSET = Properties.STRING_CHARSET.get(); | ||
|
||
/** | ||
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder | ||
|
@@ -1289,6 +1299,96 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri | |
} | ||
} | ||
|
||
@Override | ||
public <T> Mono<Void> scheduleJobAlpha1(Job<T> job) { | ||
try { | ||
final String name = job.getName(); | ||
final T data = job.getData(); | ||
|
||
if (name == null || name.trim().isEmpty()) { | ||
throw new IllegalArgumentException("Job name cannot be null or empty"); | ||
} | ||
if (data == null) { | ||
throw new IllegalArgumentException("Job data cannot be empty"); | ||
} | ||
|
||
DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder() | ||
.setName(name); | ||
if (data instanceof String) { | ||
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom((String) data, CHARSET))); | ||
} else if (data instanceof byte[]) { | ||
String base64 = OBJECT_MAPPER.writeValueAsString(data); | ||
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET))); | ||
} else { | ||
return Mono.error(() -> { | ||
throw new IllegalArgumentException("Job data value must be String or byte[]"); | ||
}); | ||
} | ||
if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) { | ||
jobBuilder.setSchedule(job.getSchedule()); | ||
} | ||
if (job.getRepeats() != null) { | ||
jobBuilder.setRepeats(job.getRepeats()); | ||
} | ||
if (job.getDueTime() != null && !job.getDueTime().trim().isEmpty()) { | ||
jobBuilder.setDueTime(job.getDueTime()); | ||
} | ||
if (job.getTtl() != null && !job.getTtl().trim().isEmpty()) { | ||
jobBuilder.setTtl(job.getTtl()); | ||
} | ||
|
||
DaprProtos.ScheduleJobRequest.Builder builder = | ||
DaprProtos.ScheduleJobRequest.newBuilder() | ||
.setJob(jobBuilder.build()); | ||
|
||
return this.<DaprProtos.ScheduleJobResponse>createMono( | ||
it -> intercept(null, asyncStub).scheduleJobAlpha1(builder.build(), it)) | ||
.then(); | ||
} catch (Exception ex) { | ||
return DaprException.wrapMono(ex); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Override | ||
public <T> Mono<Job<T>> getJobAlpha1(String name, Class<T> clazz) { | ||
try { | ||
if (name == null || name.trim().isEmpty()) { | ||
throw new IllegalArgumentException("Job name cannot be null or empty"); | ||
} | ||
return this.<DaprProtos.GetJobResponse>createMono( | ||
it -> intercept(null, asyncStub).getJobAlpha1(DaprProtos.GetJobRequest.newBuilder().setName(name).build(), it)) | ||
.map(it -> { | ||
DaprProtos.Job _job = it.getJob(); | ||
T data = null; | ||
if (clazz.isInstance(String.class)) { | ||
data = (T)_job.getData().toByteString().toString(CHARSET); | ||
} else if (clazz.isInstance(byte[].class)) { | ||
data = (T) _job.getData().toByteArray(); | ||
} else { | ||
throw new IllegalArgumentException("Job data type must be String or byte[]"); | ||
} | ||
return new Job<>(_job.getName(), _job.getSchedule(), _job.getRepeats(), _job.getDueTime(), _job.getTtl(), data); | ||
}); | ||
} catch (Exception ex) { | ||
return DaprException.wrapMono(ex); | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @salaboy @artur-ciocanu this getJobAplha1 implementation is fine? I taking in the type of the data. |
||
@Override | ||
public Mono<Void> deleteJobAlpha1(String name) { | ||
try { | ||
if (name == null || name.trim().isEmpty()) { | ||
throw new IllegalArgumentException("Job name cannot be null or empty"); | ||
} | ||
return this.<DaprProtos.DeleteJobResponse>createMono( | ||
it -> intercept(null, asyncStub).deleteJobAlpha1(DaprProtos.DeleteJobRequest.newBuilder().setName(name).build(), it)) | ||
.then(); | ||
} catch (Exception ex) { | ||
return DaprException.wrapMono(ex); | ||
} | ||
} | ||
|
||
/** | ||
* Build a new Configuration Item from provided parameter. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package io.dapr.client.domain; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This import is not used, so you should probably remove it |
||
import com.google.protobuf.Message; | ||
|
||
/** | ||
* A Job to schedule | ||
* | ||
* @param <T> The class type of Job data. | ||
*/ | ||
public final class Job<T> { | ||
|
||
private final String name; | ||
|
||
private String schedule; | ||
|
||
private Integer repeats; | ||
|
||
private String dueTime; | ||
|
||
private String ttl; | ||
|
||
private final T data; | ||
|
||
/** | ||
* Constructor for Job | ||
* | ||
* @param name name of the job to create | ||
*/ | ||
public Job(String name, T data) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The constructor should include all required fields. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @charan2628 you are still missing this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @charan2628 ideally you should put all the data in constructor and have just "getters" without setters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @artur-ciocanu as per the spec only name and data are required, remaining are optional that's why I included only these two in constructor, can I two constructors one with mandatory fields and another with all fields? |
||
super(); | ||
this.name = name; | ||
this.data = data; | ||
} | ||
|
||
/** | ||
* Constructor for Job | ||
* | ||
* @param name name of the job to create | ||
* @param schedule schedule for the job | ||
* @param repeats jobs with fixed repeat counts (accounting for Actor Reminders). | ||
* @param dueTime sets time at which or time interval before the callback is invoked for the first time. | ||
* @param ttl Time To Live to allow for auto deletes (accounting for Actor Reminders). | ||
* @param data Job data | ||
*/ | ||
public Job(String name, String schedule, Integer repeats, String dueTime, String ttl, T data) { | ||
super(); | ||
this.name = name; | ||
this.schedule = schedule; | ||
this.repeats = repeats; | ||
this.dueTime = dueTime; | ||
this.ttl = ttl; | ||
this.data = data; | ||
} | ||
|
||
public String getSchedule() { | ||
return schedule; | ||
} | ||
|
||
public void setSchedule(String schedule) { | ||
this.schedule = schedule; | ||
} | ||
|
||
public Integer getRepeats() { | ||
return repeats; | ||
} | ||
|
||
public void setRepeats(Integer repeats) { | ||
this.repeats = repeats; | ||
} | ||
|
||
public String getDueTime() { | ||
return dueTime; | ||
} | ||
|
||
public void setDueTime(String dueTime) { | ||
this.dueTime = dueTime; | ||
} | ||
|
||
public String getTtl() { | ||
return ttl; | ||
} | ||
|
||
public void setTtl(String ttl) { | ||
this.ttl = ttl; | ||
} | ||
|
||
public T getData() { | ||
return data; | ||
} | ||
|
||
public String getName() { | ||
return name; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artursouza @salaboy is this implementation fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artursouza the data of the Job can be something different than
Message
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@charan2628 @salaboy and @artursouza I see that
Job
data
is modeled asgoogle.protobuf.Any
. In other places where we usegoogle.protobuf.Any
likeTransactionalActorStateOperation
, we check thatdata
is eitherString
orbyte[]
. I think we should stick to this approach.We don't want to expose
com.google.protobuf.Message
toDaprClient
users, otherwise they will have too many options to create aJob
, while havingdata
as eitherString
orbyte[]
covers most of the use cases.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@salaboy @artur-ciocanu is this implementation fine?