diff --git a/pegasus-common/src/main/java/com/linkedin/common/callback/CompletableFutureCallbackAdapter.java b/pegasus-common/src/main/java/com/linkedin/common/callback/CompletableFutureCallbackAdapter.java new file mode 100644 index 0000000000..69af1bcf35 --- /dev/null +++ b/pegasus-common/src/main/java/com/linkedin/common/callback/CompletableFutureCallbackAdapter.java @@ -0,0 +1,45 @@ +/* + Copyright (c) 2024 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.common.callback; + +import java.util.concurrent.CompletableFuture; + + +/** + * A {@link Callback} adapter that wraps a {@link CompletableFuture} and propagates callbacks to it. + */ +public class CompletableFutureCallbackAdapter implements Callback +{ + private final CompletableFuture _future; + + public CompletableFutureCallbackAdapter(CompletableFuture future) + { + _future = future; + } + + @Override + public void onError(Throwable e) + { + _future.completeExceptionally(e); + } + + @Override + public void onSuccess(T result) + { + _future.complete(result); + } +} diff --git a/pegasus-common/src/test/java/com/linkedin/common/callback/TestCompletableFutureCallbackAdapter.java b/pegasus-common/src/test/java/com/linkedin/common/callback/TestCompletableFutureCallbackAdapter.java new file mode 100644 index 0000000000..09c8eba62a --- /dev/null +++ b/pegasus-common/src/test/java/com/linkedin/common/callback/TestCompletableFutureCallbackAdapter.java @@ -0,0 +1,62 @@ +/* + Copyright (c) 2024 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.common.callback; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.testng.annotations.Test; + +public class TestCompletableFutureCallbackAdapter +{ + @Test + public void testSuccess() + { + CompletableFuture future = new CompletableFuture<>(); + CompletableFutureCallbackAdapter adapter = new CompletableFutureCallbackAdapter<>(future); + adapter.onSuccess("haha"); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + assertFalse(future.isCancelled()); + assertEquals(future.join(), "haha"); + } + + @Test + public void testError() + { + CompletableFuture future = new CompletableFuture<>(); + CompletableFutureCallbackAdapter adapter = new CompletableFutureCallbackAdapter<>(future); + Throwable error = new IllegalArgumentException("exception"); + adapter.onError(error); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); + assertFalse(future.isCancelled()); + + try + { + future.get(); + } + catch (ExecutionException | InterruptedException e) + { + assertTrue(e instanceof ExecutionException); + assertEquals(e.getCause(), error); + } + } +} diff --git a/restli-client/src/main/java/com/linkedin/restli/client/Client.java b/restli-client/src/main/java/com/linkedin/restli/client/Client.java index 2ba276a3f1..b97a1b0887 100644 --- a/restli-client/src/main/java/com/linkedin/restli/client/Client.java +++ b/restli-client/src/main/java/com/linkedin/restli/client/Client.java @@ -17,10 +17,12 @@ package com.linkedin.restli.client; import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.CompletableFutureCallbackAdapter; import com.linkedin.common.util.None; import com.linkedin.r2.message.RequestContext; import com.linkedin.restli.client.multiplexer.MultiplexedRequest; import com.linkedin.restli.client.multiplexer.MultiplexedResponse; +import java.util.concurrent.CompletableFuture; /** @@ -92,6 +94,36 @@ ResponseFuture sendRequest(Request request, RequestContext requestCont ResponseFuture sendRequest(RequestBuilder> requestBuilder, RequestContext requestContext, ErrorHandlingBehavior errorHandlingBehavior); + /** + * Sends a type-bound REST request, returning a {@link CompletableFuture} + * + * @param request to send + * @param requestContext context for the request + * @return {@link CompletableFuture} wrapping the response + */ + default CompletableFuture> sendRequestAsync(Request request, RequestContext requestContext) + { + CompletableFuture> future = new CompletableFuture<>(); + sendRequest(request, requestContext, new CompletableFutureCallbackAdapter<>(future)); + return future; + } + + /** + * Sends a type-bound REST request using a callback, returning a {@link CompletableFuture} + * + * @param requestBuilder to invoke {@link RequestBuilder#build()} on to obtain the request + * to send. + * @param requestContext context for the request + * @return {@link CompletableFuture} wrapping the response + */ + default CompletableFuture> sendRequestAsync(RequestBuilder> requestBuilder, + RequestContext requestContext) + { + CompletableFuture> future = new CompletableFuture<>(); + sendRequest(requestBuilder, requestContext, new CompletableFutureCallbackAdapter<>(future)); + return future; + } + /** * Sends a type-bound REST request using a callback. * @@ -153,6 +185,33 @@ void sendRequest(RequestBuilder> requestBuilder, Reques ResponseFuture sendRequest(RequestBuilder> requestBuilder, ErrorHandlingBehavior errorHandlingBehavior); + /** + * Sends a type-bound REST request, returning a {@link CompletableFuture} + * + * @param request to send + * @return {@link CompletableFuture} wrapping the response + */ + default CompletableFuture> sendRequestAsync(Request request) + { + CompletableFuture> future = new CompletableFuture<>(); + sendRequest(request, new CompletableFutureCallbackAdapter<>(future)); + return future; + } + + /** + * Sends a type-bound REST request using a callback, returning a {@link CompletableFuture} + * + * @param requestBuilder to invoke {@link RequestBuilder#build()} on to obtain the request + * to send. + * @return {@link CompletableFuture} wrapping the response + */ + default CompletableFuture> sendRequestAsync(RequestBuilder> requestBuilder) + { + CompletableFuture> future = new CompletableFuture<>(); + sendRequest(requestBuilder, new CompletableFutureCallbackAdapter<>(future)); + return future; + } + /** * Sends a type-bound REST request using a callback. *