Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: TaskInfo#await #66

Merged
merged 15 commits into from
Aug 2, 2024
110 changes: 59 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ meili-search.api-key=MASTER_KEY

## Usage

#### Reactive(reactor)
### Searching

```java
public static void main(String[] args) {
public static void reactive() {
JsonHandler jsonHandler = new JacksonJsonHandler();
@Cleanup
ReactiveMSearchClient client = ReactiveMSearchClient.create(builder -> builder
Expand All @@ -117,39 +117,8 @@ public static void main(String[] args) {
)
).block();
}
```

##### Search With Details API

```java
ReactiveTypedDetailsSearch<Movie> searcher = client.indexes().searchWithDetails("movies", Movie.class);
searcher.find(builder ->builder
.q("hello world")
.filter("id < 10")
.showRankingScore(true)
)
.map(response -> {
Integer estimatedTotalHits = response.getEstimatedTotalHits();
Long processingTimeMs = response.getProcessingTimeMs();
return response.getHits();
})
.doOnNext(hitDetails -> {
for(HitDetails<Movie> hitDetail :hitDetails){
SearchDetails details = hitDetail.getDetails(); // Search Details
Movie source = hitDetail.getSource(); // Source Document

Map<String, Object> formatted = details.get_formatted();
SearchDetails.Geo geo = details.get_geo();
Double rankingScore = details.get_rankingScore();
}
})
.subscribe()
```

#### Blocking

```java
public static void main(String[] args) {
public static void blocking() {
JsonHandler jsonHandler = new JacksonJsonHandler();
@Cleanup
MSearchClient client = MSearchClient.create(builder -> builder
Expand All @@ -176,24 +145,63 @@ public static void main(String[] args) {
}
```

##### Search With Details API
### Search With Details API

```java
public static void reactive() {
ReactiveTypedDetailsSearch<Movie> searcher = client.indexes().searchWithDetails("movies", Movie.class);
searcher.find(builder ->builder
.q("hello world")
.filter("id < 10")
.showRankingScore(true)
)
.map(response -> {
Integer estimatedTotalHits = response.getEstimatedTotalHits();
Long processingTimeMs = response.getProcessingTimeMs();
return response.getHits();
})
.doOnNext(hitDetails -> {
for(HitDetails<Movie> hitDetail :hitDetails){
SearchDetails details = hitDetail.getDetails(); // Search Details
Movie source = hitDetail.getSource(); // Source Document

Map<String, Object> formatted = details.get_formatted();
SearchDetails.Geo geo = details.get_geo();
Double rankingScore = details.get_rankingScore();
}
})
.subscribe()
}

public static void blocking() {
TypedDetailsSearch<Movie> searcher = client.indexes().searchWithDetails("movies", Movie.class);
SearchDetailsResponse<Movie> response = searcher.find(builder -> builder
.q("hello world")
.filter("id < 10")
.showRankingScore(true)
);
Integer estimatedTotalHits = response.getEstimatedTotalHits();
Long processingTimeMs = response.getProcessingTimeMs();
List<HitDetails<Movie>> hitDetails = response.getHits();
for(HitDetails<Movie> hitDetail :hitDetails){
SearchDetails details = hitDetail.getDetails(); // Search Details
Movie source = hitDetail.getSource(); // Source Document

Map<String, Object> formatted = details.get_formatted();
SearchDetails.Geo geo = details.get_geo();
Double rankingScore = details.get_rankingScore();
}
}
```

### Await On TaskInfo, Only in blocking

```java
TypedDetailsSearch<Movie> searcher = client.indexes().searchWithDetails("movies", Movie.class);
SearchDetailsResponse<Movie> response = searcher.find(builder -> builder
.q("hello world")
.filter("id < 10")
.showRankingScore(true)
);
Integer estimatedTotalHits = response.getEstimatedTotalHits();
Long processingTimeMs = response.getProcessingTimeMs();
List<HitDetails<Movie>> hitDetails = response.getHits();
for(HitDetails<Movie> hitDetail :hitDetails){
SearchDetails details = hitDetail.getDetails(); // Search Details
Movie source = hitDetail.getSource(); // Source Document

Map<String, Object> formatted = details.get_formatted();
SearchDetails.Geo geo = details.get_geo();
Double rankingScore = details.get_rankingScore();
public static void blocking() {
TaskInfo saveTask = client.indexes(indexes -> indexes
.documents("movies").save(jsonQuote("{'id':100}")));
TaskView taskView = saveTask.await();
TaskStatus status = taskView.getStatus();
assert status == TaskStatus.SUCCEEDED || status == TaskStatus.FAILED;
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
import io.github.honhimw.ms.http.ReactiveHttpUtils;
import io.github.honhimw.ms.json.JsonHandler;
import io.github.honhimw.ms.json.TypeRef;
import io.github.honhimw.ms.model.TaskInfo;
import io.github.honhimw.ms.model.TaskStatus;
import io.github.honhimw.ms.model.TaskType;
import io.github.honhimw.ms.model.TaskView;
import io.github.honhimw.ms.support.ReactorUtils;
import io.github.honhimw.ms.support.TypeRefs;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.nio.charset.Charset;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.function.Consumer;

Expand Down Expand Up @@ -140,7 +148,8 @@
} else {
return false;
}
}, throwable -> Mono.empty());
}, throwable -> Mono.empty())
.map(t -> decorate(t, typeRef));
}

protected void json(ReactiveHttpUtils.Configurer configurer, Object object) {
Expand All @@ -151,4 +160,82 @@
configurer.body(payload -> payload.raw(raw -> raw.json(json)));
}

@SuppressWarnings("unchecked")
protected <T> T decorate(T delegate, TypeRef<T> typeRef) {
if (typeRef == TypeRefs.TaskInfoRef.INSTANCE) {
return (T) delegateTaskInfo((TaskInfo) delegate);
}
return delegate;
}

protected TaskInfo delegateTaskInfo(TaskInfo taskInfo) {
return new DelegateTaskInfo(taskInfo) {
@Override
public TaskView await(Duration duration) {
Mono<TaskView> tasks = _client.tasks(reactiveTasks -> reactiveTasks
.await(this, _client.config.getAwaitAttempts(), _client.config.getAwaitFixedDelay(), duration));
return ReactorUtils.blockNonNull(tasks);
}
};
}

private static abstract class DelegateTaskInfo extends TaskInfo {
private final TaskInfo taskInfo;

public DelegateTaskInfo(TaskInfo taskInfo) {
this.taskInfo = taskInfo;
}

@Override
public Integer getTaskUid() {
return taskInfo.getTaskUid();
}

@Override
public String getIndexUid() {
return taskInfo.getIndexUid();
}

@Override
public TaskStatus getStatus() {
return taskInfo.getStatus();

Check warning on line 201 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L201

Added line #L201 was not covered by tests
}

@Override
public TaskType getType() {
return taskInfo.getType();

Check warning on line 206 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L206

Added line #L206 was not covered by tests
}

@Override
public LocalDateTime getEnqueuedAt() {
return taskInfo.getEnqueuedAt();

Check warning on line 211 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L211

Added line #L211 was not covered by tests
}

@Override
public void setTaskUid(Integer taskUid) {
taskInfo.setTaskUid(taskUid);
}

Check warning on line 217 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L216-L217

Added lines #L216 - L217 were not covered by tests

@Override
public void setIndexUid(String indexUid) {
taskInfo.setIndexUid(indexUid);
}

Check warning on line 222 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L221-L222

Added lines #L221 - L222 were not covered by tests

@Override
public void setStatus(TaskStatus status) {
taskInfo.setStatus(status);
}

Check warning on line 227 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L226-L227

Added lines #L226 - L227 were not covered by tests

@Override
public void setType(TaskType type) {
taskInfo.setType(type);
}

Check warning on line 232 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L231-L232

Added lines #L231 - L232 were not covered by tests

@Override
public void setEnqueuedAt(LocalDateTime enqueuedAt) {
taskInfo.setEnqueuedAt(enqueuedAt);
}

Check warning on line 237 in src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/internal/reactive/AbstractReactiveImpl.java#L236-L237

Added lines #L236 - L237 were not covered by tests
}


}
16 changes: 16 additions & 0 deletions src/main/java/io/github/honhimw/ms/model/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

package io.github.honhimw.ms.model;

import io.github.honhimw.ms.json.JsonHandler;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;

/**
Expand Down Expand Up @@ -68,4 +70,18 @@
@Schema(description = "Represents the date and time in the RFC 3339 format when the task has been enqueued")
private LocalDateTime enqueuedAt;

/**
* Only used in non-reactive context.
*/
public TaskView await() {
return this.await(Duration.ofDays(1));
}

/**
* Only used in non-reactive context.
*/
public TaskView await(Duration duration) {
throw new UnsupportedOperationException("Not Supported");

Check warning on line 84 in src/main/java/io/github/honhimw/ms/model/TaskInfo.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/github/honhimw/ms/model/TaskInfo.java#L84

Added line #L84 was not covered by tests
}

}
10 changes: 10 additions & 0 deletions src/test/java/io/github/honhimw/ms/client/TaskTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.github.honhimw.ms.api.Documents;
import io.github.honhimw.ms.api.Tasks;
import io.github.honhimw.ms.model.TaskInfo;
import io.github.honhimw.ms.model.TaskStatus;
import io.github.honhimw.ms.model.TaskView;
import io.github.honhimw.ms.support.TestSupport;
import org.junit.jupiter.api.MethodOrderer;
Expand Down Expand Up @@ -46,6 +47,15 @@ void taskView() {
assert Objects.nonNull(taskView.getUid());
}


@Order(1)
@Test
void awaitOnTaskInfo() {
TaskInfo saveTask = blockingClient.indexes(indexes -> indexes.documents(INDEX).save(TestSupport.jsonQuote("{'id':100}")));
TaskStatus status = saveTask.await().getStatus();
assert status == TaskStatus.SUCCEEDED || status == TaskStatus.FAILED;
}

@Test
void timeout() {
TestSupport.assertError(() -> getBlockingTasks().await(1, 1, Duration.ofSeconds(1), Duration.ZERO));
Expand Down