Skip to content

Commit

Permalink
Pubsub subscription compression fix (#1666)
Browse files Browse the repository at this point in the history
* Pubsub subscription compression fix

* Pubsub subscription compression- CR fix

---------

Co-authored-by: Piotr Rżysko <[email protected]>
  • Loading branch information
michalferlinski and piotrrzysko authored May 9, 2023
1 parent fc5484b commit f50c7be
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.google.pubsub.v1.PubsubMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;

import java.io.IOException;
Expand All @@ -20,16 +19,13 @@ class GooglePubSubClient {
private static final Logger logger = LoggerFactory.getLogger(GooglePubSubClient.class);

private final Publisher publisher;
private final GooglePubSubMessageTransformer messageTransformer;

GooglePubSubClient(Publisher publisher, GooglePubSubMessageTransformer messageTransformer) {
GooglePubSubClient(Publisher publisher) {
this.publisher = publisher;
this.messageTransformer = messageTransformer;
}

void publish(Message message, CompletableFuture<MessageSendingResult> resultFuture)
void publish(PubsubMessage pubsubMessage, CompletableFuture<MessageSendingResult> resultFuture)
throws IOException, ExecutionException, InterruptedException {
PubsubMessage pubsubMessage = messageTransformer.fromHermesMessage(message);
ApiFuture<String> future = publisher.publish(pubsubMessage);
ApiFutures.addCallback(future, new GooglePubSubMessageSentCallback(resultFuture), MoreExecutors.directExecutor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class GooglePubSubClientsPool {
private final ExecutorProvider publishingExecutorProvider;
private final RetrySettings retrySettings;
private final BatchingSettings batchingSettings;
private final GooglePubSubMessageTransformerCreator messageTransformerCreator;
private final Map<GooglePubSubSenderTarget, GooglePubSubClient> clients = new HashMap<>();
private final Map<GooglePubSubSenderTarget, Integer> counters = new HashMap<>();

Expand All @@ -30,13 +29,11 @@ class GooglePubSubClientsPool {
ExecutorProvider publishingExecutorProvider,
RetrySettings retrySettings,
BatchingSettings batchingSettings,
GooglePubSubMessageTransformerCreator messageTransformerCreator,
TransportChannelProvider transportChannelProvider) {
this.credentialsProvider = credentialsProvider;
this.publishingExecutorProvider = publishingExecutorProvider;
this.retrySettings = retrySettings;
this.batchingSettings = batchingSettings;
this.messageTransformerCreator = messageTransformerCreator;
this.transportChannelProvider = transportChannelProvider;
}

Expand Down Expand Up @@ -84,6 +81,6 @@ protected GooglePubSubClient createClient(GooglePubSubSenderTarget resolvedTarge
} else {
publisher = builder.setChannelProvider(transportChannelProvider).build();
}
return new GooglePubSubClient(publisher, messageTransformerCreator.getTransformerForTargetEndpoint(resolvedTarget));
return new GooglePubSubClient(publisher);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlepubsub;

import com.google.pubsub.v1.PubsubMessage;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.sender.CompletableFutureAwareMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
Expand All @@ -15,18 +16,22 @@ class GooglePubSubMessageSender implements CompletableFutureAwareMessageSender {
private final GooglePubSubClient googlePubSubClient;
private final GooglePubSubSenderTarget resolvedTarget;
private final GooglePubSubClientsPool clientsPool;
private final GooglePubSubMessageTransformer messageTransformer;

GooglePubSubMessageSender(GooglePubSubSenderTarget resolvedTarget,
GooglePubSubClientsPool clientsPool) throws IOException {
GooglePubSubClientsPool clientsPool,
GooglePubSubMessageTransformer messageTransformer) throws IOException {
this.googlePubSubClient = clientsPool.acquire(resolvedTarget);
this.resolvedTarget = resolvedTarget;
this.clientsPool = clientsPool;
this.messageTransformer = messageTransformer;
}

@Override
public void send(Message message, CompletableFuture<MessageSendingResult> resultFuture) {
try {
googlePubSubClient.publish(message, resultFuture);
PubsubMessage pubsubMessage = messageTransformer.fromHermesMessage(message);
googlePubSubClient.publish(pubsubMessage, resultFuture);
} catch (IOException | ExecutionException | InterruptedException exception) {
resultFuture.complete(failedResult(exception));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class GooglePubSubMessageSenderProvider implements ProtocolMessageSenderP
public static final String SUPPORTED_PROTOCOL = "googlepubsub";

private final GooglePubSubSenderTargetResolver resolver;
private final GooglePubSubMessageTransformerCreator messageTransformerCreator;
private final GooglePubSubClientsPool clientsPool;

public GooglePubSubMessageSenderProvider(GooglePubSubSenderTargetResolver resolver,
Expand All @@ -31,12 +32,12 @@ public GooglePubSubMessageSenderProvider(GooglePubSubSenderTargetResolver resolv
GooglePubSubMessageTransformerCreator messageTransformerCreator) {

this.resolver = resolver;
this.messageTransformerCreator = messageTransformerCreator;
this.clientsPool = new GooglePubSubClientsPool(
credentialsProvider,
executorProvider,
retrySettings,
batchingSettings,
messageTransformerCreator,
transportChannelProvider
);
}
Expand All @@ -45,7 +46,8 @@ public GooglePubSubMessageSenderProvider(GooglePubSubSenderTargetResolver resolv
public MessageSender create(final Subscription subscription, ResilientMessageSender resilientMessageSender) {
final GooglePubSubSenderTarget resolvedTarget = resolver.resolve(subscription.getEndpoint());
try {
GooglePubSubMessageSender sender = new GooglePubSubMessageSender(resolvedTarget, clientsPool);
GooglePubSubMessageTransformer messageTransformer = messageTransformerCreator.getTransformerForTargetEndpoint(resolvedTarget);
GooglePubSubMessageSender sender = new GooglePubSubMessageSender(resolvedTarget, clientsPool, messageTransformer);
return new SingleRecipientMessageSenderAdapter(sender, resilientMessageSender);
} catch (IOException e) {
throw new RuntimeException("Cannot create Google PubSub publishers cache", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,20 @@ class GooglePubSubClientsPoolTest extends Specification {
class GooglePubSubClientsPoolUnderTest extends GooglePubSubClientsPool {

private Publisher publisher
private GooglePubSubMessageTransformer messageTransformer

GooglePubSubClientsPoolUnderTest(CredentialsProvider credentialsProvider,
ExecutorProvider publishingExecutorProvider,
RetrySettings retrySettings,
BatchingSettings batchingSettings,
GooglePubSubMessageTransformerCreator messageTransformerCreator,
TransportChannelProvider transportChannelProvider,
Publisher publisher,
GooglePubSubMessageTransformer messageTransformer) {
super(credentialsProvider, publishingExecutorProvider, retrySettings, batchingSettings,
messageTransformerCreator, transportChannelProvider)
Publisher publisher) {
super(credentialsProvider, publishingExecutorProvider, retrySettings, batchingSettings, transportChannelProvider)
this.publisher = publisher
this.messageTransformer = messageTransformer
}

@Override
protected GooglePubSubClient createClient(GooglePubSubSenderTarget resolvedTarget) throws IOException {
return new GooglePubSubClient(this.publisher, this.messageTransformer)
return new GooglePubSubClient(this.publisher)
}
}

Expand All @@ -42,8 +37,7 @@ class GooglePubSubClientsPoolTest extends Specification {
def pubSubEndpoint = "https://pubsub.endpoint"

def poolUnderTest = new GooglePubSubClientsPoolUnderTest(Stub(CredentialsProvider), Stub(ExecutorProvider),
Stub(RetrySettings), Stub(BatchingSettings), Stub(GooglePubSubMessageTransformerCreator),
Stub(TransportChannelProvider), Stub(Publisher), Stub(GooglePubSubMessageTransformer))
Stub(RetrySettings), Stub(BatchingSettings), Stub(TransportChannelProvider), Stub(Publisher))

def targetWithCodec = GooglePubSubSenderTarget.builder()
.withTopicName(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ class GooglePubSubMessageSenderTest extends Specification {

GooglePubSubClientsPool clientsPool = Mock(GooglePubSubClientsPool)

GooglePubSubClient client = new GooglePubSubClient(publisher, new GooglePubSubMessageTransformerRaw(
new GooglePubSubMetadataAppender()))
GooglePubSubClient client = new GooglePubSubClient(publisher)

@Subject
GooglePubSubMessageSender sender

void setup() {
clientsPool.acquire(senderTarget) >> client
sender = new GooglePubSubMessageSender(senderTarget, clientsPool)
sender = new GooglePubSubMessageSender(senderTarget, clientsPool,
new GooglePubSubMessageTransformerRaw(new GooglePubSubMetadataAppender()))
}

def 'should return result on a happy path'() {
Expand Down

0 comments on commit f50c7be

Please sign in to comment.