Skip to content

Commit

Permalink
Add utility methods for bridging Rest client with CompletableFuture (#…
Browse files Browse the repository at this point in the history
…1013)

Co-authored-by: Karthik Ramgopal <[email protected]>
  • Loading branch information
karthikrg and li-kramgopa authored Aug 14, 2024
1 parent 0d2233a commit c67ff74
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright (c) 2024 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.common.callback;

import java.util.concurrent.CompletableFuture;


/**
* A {@link Callback} adapter that wraps a {@link CompletableFuture} and propagates callbacks to it.
*/
public class CompletableFutureCallbackAdapter<T> implements Callback<T>
{
private final CompletableFuture<T> _future;

public CompletableFutureCallbackAdapter(CompletableFuture<T> future)
{
_future = future;
}

@Override
public void onError(Throwable e)
{
_future.completeExceptionally(e);
}

@Override
public void onSuccess(T result)
{
_future.complete(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright (c) 2024 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.common.callback;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.testng.annotations.Test;

public class TestCompletableFutureCallbackAdapter
{
@Test
public void testSuccess()
{
CompletableFuture<String> future = new CompletableFuture<>();
CompletableFutureCallbackAdapter<String> adapter = new CompletableFutureCallbackAdapter<>(future);
adapter.onSuccess("haha");
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
assertEquals(future.join(), "haha");
}

@Test
public void testError()
{
CompletableFuture<String> future = new CompletableFuture<>();
CompletableFutureCallbackAdapter<String> adapter = new CompletableFutureCallbackAdapter<>(future);
Throwable error = new IllegalArgumentException("exception");
adapter.onError(error);
assertTrue(future.isDone());
assertTrue(future.isCompletedExceptionally());
assertFalse(future.isCancelled());

try
{
future.get();
}
catch (ExecutionException | InterruptedException e)
{
assertTrue(e instanceof ExecutionException);
assertEquals(e.getCause(), error);
}
}
}
59 changes: 59 additions & 0 deletions restli-client/src/main/java/com/linkedin/restli/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.linkedin.restli.client;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.CompletableFutureCallbackAdapter;
import com.linkedin.common.util.None;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.restli.client.multiplexer.MultiplexedRequest;
import com.linkedin.restli.client.multiplexer.MultiplexedResponse;
import java.util.concurrent.CompletableFuture;


/**
Expand Down Expand Up @@ -92,6 +94,36 @@ <T> ResponseFuture<T> sendRequest(Request<T> request, RequestContext requestCont
<T> ResponseFuture<T> sendRequest(RequestBuilder<? extends Request<T>> requestBuilder, RequestContext requestContext,
ErrorHandlingBehavior errorHandlingBehavior);

/**
* Sends a type-bound REST request, returning a {@link CompletableFuture}
*
* @param request to send
* @param requestContext context for the request
* @return {@link CompletableFuture} wrapping the response
*/
default <T> CompletableFuture<Response<T>> sendRequestAsync(Request<T> request, RequestContext requestContext)
{
CompletableFuture<Response<T>> future = new CompletableFuture<>();
sendRequest(request, requestContext, new CompletableFutureCallbackAdapter<>(future));
return future;
}

/**
* Sends a type-bound REST request using a callback, returning a {@link CompletableFuture}
*
* @param requestBuilder to invoke {@link RequestBuilder#build()} on to obtain the request
* to send.
* @param requestContext context for the request
* @return {@link CompletableFuture} wrapping the response
*/
default <T> CompletableFuture<Response<T>> sendRequestAsync(RequestBuilder<? extends Request<T>> requestBuilder,
RequestContext requestContext)
{
CompletableFuture<Response<T>> future = new CompletableFuture<>();
sendRequest(requestBuilder, requestContext, new CompletableFutureCallbackAdapter<>(future));
return future;
}

/**
* Sends a type-bound REST request using a callback.
*
Expand Down Expand Up @@ -153,6 +185,33 @@ <T> void sendRequest(RequestBuilder<? extends Request<T>> requestBuilder, Reques
<T> ResponseFuture<T> sendRequest(RequestBuilder<? extends Request<T>> requestBuilder,
ErrorHandlingBehavior errorHandlingBehavior);

/**
* Sends a type-bound REST request, returning a {@link CompletableFuture}
*
* @param request to send
* @return {@link CompletableFuture} wrapping the response
*/
default <T> CompletableFuture<Response<T>> sendRequestAsync(Request<T> request)
{
CompletableFuture<Response<T>> future = new CompletableFuture<>();
sendRequest(request, new CompletableFutureCallbackAdapter<>(future));
return future;
}

/**
* Sends a type-bound REST request using a callback, returning a {@link CompletableFuture}
*
* @param requestBuilder to invoke {@link RequestBuilder#build()} on to obtain the request
* to send.
* @return {@link CompletableFuture} wrapping the response
*/
default <T> CompletableFuture<Response<T>> sendRequestAsync(RequestBuilder<? extends Request<T>> requestBuilder)
{
CompletableFuture<Response<T>> future = new CompletableFuture<>();
sendRequest(requestBuilder, new CompletableFutureCallbackAdapter<>(future));
return future;
}

/**
* Sends a type-bound REST request using a callback.
*
Expand Down

0 comments on commit c67ff74

Please sign in to comment.