Skip to content

Commit

Permalink
Enh 37496193 - [37496162->24.09.2] General refactoring and hardening …
Browse files Browse the repository at this point in the history
…of the gRPC APIs

(merge ce/main -> ce/24.09 113641)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 113642]
  • Loading branch information
thegridman committed Jan 20, 2025
1 parent e856cf5 commit e15767a
Show file tree
Hide file tree
Showing 69 changed files with 2,802 additions and 365 deletions.
25 changes: 21 additions & 4 deletions prj/coherence-core-components/pom.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2000, 2025, Oracle and/or its affiliates.
~
~ Licensed under the Universal Permissive License v 1.0 as shown at
~ https://oss.oracle.com/licenses/upl.
Copyright (c) 2000, 2025, Oracle and/or its affiliates.
Licensed under the Universal Permissive License v 1.0 as shown at
https://oss.oracle.com/licenses/upl.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
Expand Down Expand Up @@ -70,6 +70,23 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>provided</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -297,7 +297,19 @@ private com.tangosol.coherence.Component get_Module()
{
return this;
}


/**
* Create a new instance of a {@link Channel}.
*
* @return a new instance of a {@link Channel}
*/
protected Channel createChannel()
{
Channel channel = new Channel();
channel.setConnection(this);
return channel;
}

// From interface: com.tangosol.net.messaging.Connection
public com.tangosol.net.messaging.Channel acceptChannel(java.net.URI uri, ClassLoader loader, com.tangosol.net.messaging.Channel.Receiver receiver, javax.security.auth.Subject subject)
{
Expand Down Expand Up @@ -440,9 +452,8 @@ public com.tangosol.net.messaging.Channel acceptChannelResponse(int nId, com.tan
throw new IllegalArgumentException("serializer cannot be null");
}

Channel channel = new Channel();
Channel channel = createChannel();
channel.setId(nId);
channel.setConnection(this);
channel.setMessageFactory(factory);
channel.setReceiver(receiver);
channel.setSerializer(serializer);
Expand Down Expand Up @@ -684,9 +695,8 @@ public java.net.URI createChannelInternal(com.tangosol.net.messaging.Protocol pr
int nId = generateChannelId();

// create a new Channel
Channel channel = new Channel();
Channel channel = createChannel();
channel.setId(nId);
channel.setConnection(this);
channel.setReceiver(receiver);
channel.setMessageFactory(factory);
channel.setSerializer(serializer);
Expand Down Expand Up @@ -1170,8 +1180,7 @@ public void onInit()
// import com.tangosol.util.ThreadGate;

// create and register "Channel0"
Channel channel0 = new Channel();
channel0.setConnection(this);
Channel channel0 = createChannel();
registerChannel(channel0);

setThreadGate(new ThreadGate());
Expand Down Expand Up @@ -1306,8 +1315,7 @@ public int openChannelRequest(String sProtocol, com.tangosol.io.Serializer seria

int nId = generateChannelId();

Channel channel = new Channel();
channel.setConnection(this);
Channel channel = createChannel();
channel.setId(nId);
channel.setMessageFactory(factory);
channel.setReceiver(receiver);
Expand Down Expand Up @@ -1345,9 +1353,8 @@ public com.tangosol.net.messaging.Channel openChannelResponse(int nId, com.tango
throw new IllegalArgumentException("serializer cannot be null");
}

Channel channel = new Channel();
Channel channel = createChannel();
channel.setId(nId);
channel.setConnection(this);
channel.setMessageFactory(factory);
channel.setReceiver(receiver);
channel.setSerializer(serializer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.coherence.component.net.extend.message;

import com.google.protobuf.Any;

import com.tangosol.coherence.component.net.extend.message.response.GrpcResponse;
import com.tangosol.io.Serializer;

import com.tangosol.net.messaging.Message;

/**
* A {@link Message} implementation that wraps a protobuf message.
*
* @author Jonathan Knight 2025.01.25
*/
public interface GrpcMessageWrapper
extends Message
{
/**
* Set the wrapped protobuf message.
*
* @param any the wrapped protobuf message
* @param serializer the serializer to deserialize binary payloads
*/
void setProtoMessage(Any any, Serializer serializer);

/**
* Return the message response.
*
* @return the message response
*/
GrpcResponse getResponse();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.coherence.component.net.extend.message.response;

import com.google.protobuf.Message;

import com.tangosol.coherence.component.net.extend.message.Response;

import com.tangosol.io.Serializer;

import io.grpc.stub.StreamObserver;

/**
* A gRPC response message.
*
* @author Jonathan Knight 2025.01.25
*/
public abstract class GrpcResponse
extends Response
{
/**
* Default constructor.
*/
protected GrpcResponse()
{
super(null, null, true);
}

/**
* Set the proxy identifier for this message.
*
* @param proxyId the proxy identifier for this message
*/
public void setProxyId(int proxyId)
{
m_nProxyId = proxyId;
}

/**
* Return the proxy identifier for this message.
*
* @return the proxy identifier for this message
*/
public int getProxyId()
{
return m_nProxyId;
}

/**
* Set the {@link StreamObserver} to send responses to.
*
* @param observer the {@link StreamObserver} to send responses to
*/
public void setStreamObserver(StreamObserver<? extends Message> observer)
{
m_observer = observer;
}

/**
* Return the {@link StreamObserver} to send responses to.
*
* @return the {@link StreamObserver} to send responses to
*/
public StreamObserver<? extends Message> getStreamObserver()
{
return m_observer;
}

/**
* Set the {@link Serializer}.
*
* @param serializer the {@link Serializer}
*/
public void setSerializer(Serializer serializer)
{
m_serializer = serializer;
}

/**
* Return the {@link Serializer}.
*
* @return the {@link Serializer}
*/
public Serializer getSerializer()
{
return m_serializer;
}

/**
* Return {@code true} if the {@link StreamObserver} should be
* completed after returning the response.
*
* @return {@code true} if the {@link StreamObserver} should be
* completed after returning the response
*/
public boolean completeStream()
{
return true;
}

/**
* Create a protobuf response message.
*
* @return a protobuf response message
*/
public abstract Message getProtoResponse();

// ----- data members ---------------------------------------------------

/**
* The proxy identifier.
*/
protected int m_nProxyId;

/**
* The {@link Serializer}.
*/
protected Serializer m_serializer;

/**
* The {@link StreamObserver} to send responses to.
*/
protected StreamObserver<? extends Message> m_observer;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.coherence.component.net.extend.messageFactory;

import com.google.protobuf.Message;

import com.tangosol.coherence.component.net.extend.message.GrpcMessageWrapper;

import com.tangosol.coherence.component.net.extend.message.response.GrpcResponse;

import com.tangosol.io.Serializer;

import com.tangosol.net.messaging.Protocol;

/**
* A {@link Protocol.MessageFactory} that can also produce messages
* from Protobuf requests.
*
* @author Jonathan Knight 2025.01.25
*/
public interface GrpcMessageFactory<Req extends Message, Resp extends Message>
extends Protocol.MessageFactory
{
/**
* Create a {@link GrpcMessageWrapper} from a Protobuf {@link Message}.
*
* @param request the {@link Message} to create the {@link GrpcMessageWrapper} from
* @param serializer the {@link Serializer} to use to deserialize binary payloads
* @param <M> the expected type of the message to return
*
* @return a {@link GrpcMessageWrapper} created from a Protobuf {@link Message}
*/
<M extends GrpcMessageWrapper> M createRequestMessage(Req request, Serializer serializer);

/**
* Create a response {@link Message} that will be wrapped in a proxy response
* before being sent to the response stream observer.
*
* @param response the {@link GrpcResponse} to convert to a {@link Message}
*
* @return the response {@link Message}
*/
Resp createResponse(GrpcResponse response);

/**
* Convert a Coherence {@link com.tangosol.net.messaging.Message}
* into a corresponding Protobuf {@link Message}.
*
* @param message the Coherence message to convert
* @param nProxyId the proxy identifier for the message
*
* @return the corresponding Protobuf {@link Message}
*/
Resp toProtoMessage(com.tangosol.net.messaging.Message message, int nProxyId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.coherence.component.net.extend.proxy;

import com.google.protobuf.Message;

import com.tangosol.coherence.component.util.daemon.queueProcessor.service.peer.acceptor.grpcAcceptor.GrpcChannel;

import com.tangosol.net.messaging.Channel;

/**
* A gRPC extend proxy.
*
* @author Jonathan Knight 2025.01.25
*/
public interface GrpcExtendProxy<Resp extends Message>
extends Channel.Receiver
{
/**
* Return the {@link GrpcChannel} used by this proxy.
*
* @return the {@link GrpcChannel} used by this proxy
*/
@SuppressWarnings("unchecked")
default GrpcChannel<Resp> getGrpcChannel()
{
return (GrpcChannel<Resp>) getChannel();
}

/**
* Return the {@link Channel} used by this proxy.
*
* @return the {@link Channel} used by this proxy
*/
Channel getChannel();
}
Loading

0 comments on commit e15767a

Please sign in to comment.