From 5628ef55e88263da7cefe38e68437d3d515eca5a Mon Sep 17 00:00:00 2001 From: sai charan Date: Thu, 29 Aug 2024 23:26:24 +0530 Subject: [PATCH 1/2] Schedule Job API --- .../main/java/io/dapr/client/DaprClient.java | 12 +++ .../java/io/dapr/client/DaprClientImpl.java | 48 ++++++++++++ .../main/java/io/dapr/client/domain/Job.java | 74 +++++++++++++++++++ 3 files changed, 134 insertions(+) create mode 100644 sdk/src/main/java/io/dapr/client/domain/Job.java diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 9b713f7c7e..94dc66e28c 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -25,6 +25,7 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.Job; import io.dapr.client.domain.PublishEventRequest; import io.dapr.client.domain.SaveStateRequest; import io.dapr.client.domain.State; @@ -44,6 +45,8 @@ import java.util.Map; import java.util.function.Function; +import com.google.protobuf.Message; + /** * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required. * @@ -662,6 +665,15 @@ Flux subscribeConfiguration(String storeName, Li * @return Mono of {@link UnsubscribeConfigurationResponse} instance. */ Mono unsubscribeConfiguration(UnsubscribeConfigurationRequest request); + + /** + * ScheduleJobAlpha1 creates and schedules a job. + * + * @param The type of the data for the job. + * @param job job to be scheduled + * @return a Mono plan of type Void. + */ + Mono scheduleJobAlpha1(Job job); /** * Returns a newly created gRPC stub with proper interceptors and channel for gRPC proxy invocation. diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index aabc07a60f..71762fcb57 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -14,8 +14,10 @@ package io.dapr.client; import com.google.common.base.Strings; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.Message; import io.dapr.client.domain.ActorMetadata; import io.dapr.client.domain.AppConnectionPropertiesHealthMetadata; import io.dapr.client.domain.AppConnectionPropertiesMetadata; @@ -37,6 +39,7 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.Job; import io.dapr.client.domain.LockRequest; import io.dapr.client.domain.PublishEventRequest; import io.dapr.client.domain.QueryStateItem; @@ -1175,6 +1178,51 @@ public Mono unsubscribeConfiguration(Unsubscri } } + @Override + public Mono scheduleJobAlpha1(Job job) { + try { + final String name = job.getName(); + final T data = job.getData(); + + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Job name cannot be null or empty"); + } + if (data == null) { + throw new IllegalArgumentException("Job data cannot be empty"); + } + + DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder() + .setName(name); + if (data instanceof Message) { + jobBuilder.setData(Any.pack((Message)job.getData())); + } else { + jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(this.objectSerializer.serialize(data)))); + } + if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) { + jobBuilder.setSchedule(job.getSchedule()); + } + if (job.getRepeats() != null) { + jobBuilder.setRepeats(job.getRepeats()); + } + if (job.getDueTime() != null && !job.getDueTime().trim().isEmpty()) { + jobBuilder.setDueTime(job.getDueTime()); + } + if (job.getTtl() != null && !job.getTtl().trim().isEmpty()) { + jobBuilder.setTtl(job.getTtl()); + } + + DaprProtos.ScheduleJobRequest.Builder builder = + DaprProtos.ScheduleJobRequest.newBuilder() + .setJob(jobBuilder.build()); + + return this.createMono( + it -> intercept(null, asyncStub).scheduleJobAlpha1(builder.build(), it)) + .then(); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + /** * Build a new Configuration Item from provided parameter. * diff --git a/sdk/src/main/java/io/dapr/client/domain/Job.java b/sdk/src/main/java/io/dapr/client/domain/Job.java new file mode 100644 index 0000000000..ab69072bfc --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/Job.java @@ -0,0 +1,74 @@ +package io.dapr.client.domain; + +import com.google.protobuf.Message; + +/** + * A Job to schedule + * + * @param The class type of Job data. + */ +public final class Job { + + private final String name; + + private String schedule; + + private Integer repeats; + + private String dueTime; + + private String ttl; + + private final T data; + + /** + * Constructor for Job + * + * @param name name of the job to create + */ + public Job(String name, T data) { + super(); + this.name = name; + this.data = data; + } + + public String getSchedule() { + return schedule; + } + + public void setSchedule(String schedule) { + this.schedule = schedule; + } + + public Integer getRepeats() { + return repeats; + } + + public void setRepeats(Integer repeats) { + this.repeats = repeats; + } + + public String getDueTime() { + return dueTime; + } + + public void setDueTime(String dueTime) { + this.dueTime = dueTime; + } + + public String getTtl() { + return ttl; + } + + public void setTtl(String ttl) { + this.ttl = ttl; + } + + public T getData() { + return data; + } + + public String getName() { + return name; + } +} From 4c208b45930f1d32019cdb60d0cb2888adfe54d2 Mon Sep 17 00:00:00 2001 From: sai charan Date: Thu, 17 Oct 2024 18:55:05 +0530 Subject: [PATCH 2/2] DeleteJobRequest and GetJobRequest API's --- .../main/java/io/dapr/client/DaprClient.java | 8 --- .../java/io/dapr/client/DaprClientImpl.java | 58 ++++++++++++++++++- .../io/dapr/client/DaprPreviewClient.java | 27 +++++++++ .../main/java/io/dapr/client/domain/Job.java | 20 +++++++ 4 files changed, 102 insertions(+), 11 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 17c7adaf87..d5f0b39102 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -666,14 +666,6 @@ Flux subscribeConfiguration(String storeName, Li */ Mono unsubscribeConfiguration(UnsubscribeConfigurationRequest request); - /** - * ScheduleJobAlpha1 creates and schedules a job. - * - * @param The type of the data for the job. - * @param job job to be scheduled - * @return a Mono plan of type Void. - */ - Mono scheduleJobAlpha1(Job job); /** * Returns a newly created gRPC stub with proper interceptors and channel for gRPC proxy invocation. diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index efd3f94a39..26a672099e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -13,6 +13,7 @@ package io.dapr.client; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.protobuf.Any; import com.google.protobuf.ByteString; @@ -59,6 +60,7 @@ import io.dapr.client.domain.UnsubscribeConfigurationRequest; import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.client.resiliency.ResiliencyOptions; +import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.internal.grpc.DaprClientGrpcInterceptors; @@ -92,6 +94,7 @@ import reactor.util.retry.Retry; import java.io.IOException; +import java.nio.charset.Charset; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -140,6 +143,10 @@ public class DaprClientImpl extends AbstractDaprClient { private final DaprHttp httpClient; private final DaprClientGrpcInterceptors grpcInterceptors; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final Charset CHARSET = Properties.STRING_CHARSET.get(); /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder @@ -1313,10 +1320,15 @@ public Mono scheduleJobAlpha1(Job job) { DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder() .setName(name); - if (data instanceof Message) { - jobBuilder.setData(Any.pack((Message)job.getData())); + if (data instanceof String) { + jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom((String) data, CHARSET))); + } else if (data instanceof byte[]) { + String base64 = OBJECT_MAPPER.writeValueAsString(data); + jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET))); } else { - jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(this.objectSerializer.serialize(data)))); + return Mono.error(() -> { + throw new IllegalArgumentException("Job data value must be String or byte[]"); + }); } if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) { jobBuilder.setSchedule(job.getSchedule()); @@ -1342,6 +1354,46 @@ public Mono scheduleJobAlpha1(Job job) { return DaprException.wrapMono(ex); } } + + @SuppressWarnings("unchecked") + @Override + public Mono> getJobAlpha1(String name, Class clazz) { + try { + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Job name cannot be null or empty"); + } + return this.createMono( + it -> intercept(null, asyncStub).getJobAlpha1(DaprProtos.GetJobRequest.newBuilder().setName(name).build(), it)) + .map(it -> { + DaprProtos.Job _job = it.getJob(); + T data = null; + if (clazz.isInstance(String.class)) { + data = (T)_job.getData().toByteString().toString(CHARSET); + } else if (clazz.isInstance(byte[].class)) { + data = (T) _job.getData().toByteArray(); + } else { + throw new IllegalArgumentException("Job data type must be String or byte[]"); + } + return new Job<>(_job.getName(), _job.getSchedule(), _job.getRepeats(), _job.getDueTime(), _job.getTtl(), data); + }); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + + @Override + public Mono deleteJobAlpha1(String name) { + try { + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Job name cannot be null or empty"); + } + return this.createMono( + it -> intercept(null, asyncStub).deleteJobAlpha1(DaprProtos.DeleteJobRequest.newBuilder().setName(name).build(), it)) + .then(); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } /** * Build a new Configuration Item from provided parameter. diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 95911efc23..8b0e2943e1 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -17,6 +17,7 @@ import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import io.dapr.client.domain.Job; import io.dapr.client.domain.LockRequest; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; @@ -268,4 +269,30 @@ Mono> publishEvents(String pubsubName, String topicNa */ Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + + /** + * ScheduleJobAlpha1 creates and schedules a job. + * + * @param The type of the data for the job. + * @param job job to be scheduled + * @return a Mono plan of type Void. + */ + Mono scheduleJobAlpha1(Job job); + + /** + * GetJobAlpha1 retrieve Job by name. + * + * @param The type of the data for the job. + * @param name name of the job + * @return a Mono of Job + */ + Mono> getJobAlpha1(String name, Class clazz); + + /** + * Delete a Job. + * + * @param name name of the job + * @return + */ + Mono deleteJobAlpha1(String name); } diff --git a/sdk/src/main/java/io/dapr/client/domain/Job.java b/sdk/src/main/java/io/dapr/client/domain/Job.java index ab69072bfc..c30743cb16 100644 --- a/sdk/src/main/java/io/dapr/client/domain/Job.java +++ b/sdk/src/main/java/io/dapr/client/domain/Job.java @@ -32,6 +32,26 @@ public Job(String name, T data) { this.data = data; } + /** + * Constructor for Job + * + * @param name name of the job to create + * @param schedule schedule for the job + * @param repeats jobs with fixed repeat counts (accounting for Actor Reminders). + * @param dueTime sets time at which or time interval before the callback is invoked for the first time. + * @param ttl Time To Live to allow for auto deletes (accounting for Actor Reminders). + * @param data Job data + */ + public Job(String name, String schedule, Integer repeats, String dueTime, String ttl, T data) { + super(); + this.name = name; + this.schedule = schedule; + this.repeats = repeats; + this.dueTime = dueTime; + this.ttl = ttl; + this.data = data; + } + public String getSchedule() { return schedule; }