Skip to content

Commit

Permalink
Adds CPMap Client Support [HZ-3539] (hazelcast#25877)
Browse files Browse the repository at this point in the history
Adds client support for the CPMap API, e.g.

```java
HazelcastInstance client = HazelcastClient.newHazelcastClient();
CPMap<String, String> map = client.getCPSubsystem().getMap("mymap");
assert map.put("k", "v") == null;
assert "v".equals(map.get("k"));
```

EE PR: https://github.com/hazelcast/hazelcast-enterprise/pull/6752
Client Protocol PR:
hazelcast/hazelcast-client-protocol#484
  • Loading branch information
gbarnett-hz authored Nov 9, 2023
1 parent 9301cde commit 1fa9b9c
Show file tree
Hide file tree
Showing 28 changed files with 1,344 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hazelcast.client.cp.internal;

import com.hazelcast.client.cp.internal.datastructures.proxy.ClientRaftProxyFactory;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.CPSubsystemAddGroupAvailabilityListenerCodec;
import com.hazelcast.client.impl.protocol.codec.CPSubsystemAddMembershipListenerCodec;
Expand Down Expand Up @@ -65,11 +64,11 @@
*/
public class CPSubsystemImpl implements CPSubsystem {

private final ClientRaftProxyFactory proxyFactory;
protected final ClientRaftProxyFactory proxyFactory;
private volatile ClientContext context;

public CPSubsystemImpl(HazelcastClientInstanceImpl client) {
this.proxyFactory = new ClientRaftProxyFactory(client);
public CPSubsystemImpl(ClientRaftProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}

public void init(ClientContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
*/
public class ClientRaftProxyFactory {

protected ClientContext context;
private final HazelcastClientInstanceImpl client;
private final ConcurrentMap<String, FencedLockProxy> lockProxies
= new ConcurrentHashMap<String, FencedLockProxy>();
private ClientContext context;

public ClientRaftProxyFactory(HazelcastClientInstanceImpl client) {
this.client = client;
Expand Down Expand Up @@ -119,7 +119,7 @@ private ISemaphore createSemaphore(RaftGroupId groupId, String proxyName, String
: new SessionAwareSemaphoreProxy(context, groupId, proxyName, objectName);
}

private RaftGroupId getGroupId(String proxyName, String objectName) {
protected RaftGroupId getGroupId(String proxyName, String objectName) {
ClientMessage request = CPGroupCreateCPGroupCodec.encodeRequest(proxyName);
ClientMessage response = new ClientInvocation(client, request, objectName).invoke().joinInternal();
return CPGroupCreateCPGroupCodec.decodeResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.client.impl.spi.ClientProxyFactory;
import com.hazelcast.config.SSLConfig;
import com.hazelcast.config.SocketInterceptorConfig;
import com.hazelcast.cp.CPSubsystem;
import com.hazelcast.internal.nearcache.NearCacheManager;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.serialization.InternalSerializationService;
Expand Down Expand Up @@ -112,4 +113,9 @@ public interface ClientExtension {
* Returns a JetService.
*/
JetService getJet();

/**
* Creates the relevant CP subsystem implementation.
*/
CPSubsystem createCPSubsystem(HazelcastClientInstanceImpl hazelcastClientInstance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.config.SocketOptions;
import com.hazelcast.client.cp.internal.CPSubsystemImpl;
import com.hazelcast.client.cp.internal.datastructures.proxy.ClientRaftProxyFactory;
import com.hazelcast.client.impl.ClientExtension;
import com.hazelcast.client.impl.connection.tcp.ClientPlainChannelInitializer;
import com.hazelcast.client.impl.proxy.ClientMapProxy;
Expand All @@ -34,6 +36,7 @@
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.config.SocketInterceptorConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.CPSubsystem;
import com.hazelcast.instance.BuildInfo;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.internal.memory.DefaultMemoryStats;
Expand Down Expand Up @@ -237,4 +240,9 @@ public NearCacheManager createNearCacheManager() {
public JetService getJet() {
return jetClient;
}

@Override
public CPSubsystem createCPSubsystem(HazelcastClientInstanceImpl hazelcastClientInstance) {
return new CPSubsystemImpl(new ClientRaftProxyFactory(hazelcastClientInstance));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public HazelcastClientInstanceImpl(String instanceName, ClientConfig clientConfi
clientStatisticsService = new ClientStatisticsService(this);
userCodeDeploymentService = new ClientUserCodeDeploymentService(config.getUserCodeDeploymentConfig(), classLoader);
proxySessionManager = new ClientProxySessionManager(this);
cpSubsystem = new CPSubsystemImpl(this);
cpSubsystem = (CPSubsystemImpl) clientExtension.createCPSubsystem(this);
sqlService = new SqlClientService(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.impl.protocol.codec;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
import com.hazelcast.client.impl.protocol.codec.builtin.*;
import com.hazelcast.client.impl.protocol.codec.custom.*;

import javax.annotation.Nullable;

import static com.hazelcast.client.impl.protocol.ClientMessage.*;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;

/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
* To change this file, edit the templates or the protocol
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* and regenerate it.
*/

/**
* Tests if the value associated with the key is expectedValue and if so associates key with
* newValue.
*/
@SuppressWarnings("unused")
@Generated("f6639933f5e2c5fc315fa4e20d0ca863")
public final class CPMapCompareAndSetCodec {
//hex: 0x230600
public static final int REQUEST_MESSAGE_TYPE = 2295296;
//hex: 0x230601
public static final int RESPONSE_MESSAGE_TYPE = 2295297;
private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int RESPONSE_RESPONSE_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_RESPONSE_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;

private CPMapCompareAndSetCodec() {
}

@edu.umd.cs.findbugs.annotations.SuppressFBWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"})
public static class RequestParameters {

/**
* CP group ID of this CPMap instance.
*/
public com.hazelcast.cp.internal.RaftGroupId groupId;

/**
* Name of this CPMap instance.
*/
public java.lang.String name;

/**
* Key of the data that is subject of the compare and set.
*/
public com.hazelcast.internal.serialization.Data key;

/**
* The expected value associated with key.
*/
public com.hazelcast.internal.serialization.Data expectedValue;

/**
* The new value to associate with key.
*/
public com.hazelcast.internal.serialization.Data newValue;
}

public static ClientMessage encodeRequest(com.hazelcast.cp.internal.RaftGroupId groupId, java.lang.String name, com.hazelcast.internal.serialization.Data key, com.hazelcast.internal.serialization.Data expectedValue, com.hazelcast.internal.serialization.Data newValue) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setContainsSerializedDataInRequest(true);
clientMessage.setRetryable(false);
clientMessage.setOperationName("CPMap.CompareAndSet");
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
clientMessage.add(initialFrame);
RaftGroupIdCodec.encode(clientMessage, groupId);
StringCodec.encode(clientMessage, name);
DataCodec.encode(clientMessage, key);
DataCodec.encode(clientMessage, expectedValue);
DataCodec.encode(clientMessage, newValue);
return clientMessage;
}

public static CPMapCompareAndSetCodec.RequestParameters decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
RequestParameters request = new RequestParameters();
//empty initial frame
iterator.next();
request.groupId = RaftGroupIdCodec.decode(iterator);
request.name = StringCodec.decode(iterator);
request.key = DataCodec.decode(iterator);
request.expectedValue = DataCodec.decode(iterator);
request.newValue = DataCodec.decode(iterator);
return request;
}

public static ClientMessage encodeResponse(boolean response) {
ClientMessage clientMessage = ClientMessage.createForEncode();
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
encodeBoolean(initialFrame.content, RESPONSE_RESPONSE_FIELD_OFFSET, response);
clientMessage.add(initialFrame);

return clientMessage;
}

/**
* True if key was associated with newValue, otherwise false.
*/
public static boolean decodeResponse(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
ClientMessage.Frame initialFrame = iterator.next();
return decodeBoolean(initialFrame.content, RESPONSE_RESPONSE_FIELD_OFFSET);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.impl.protocol.codec;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
import com.hazelcast.client.impl.protocol.codec.builtin.*;
import com.hazelcast.client.impl.protocol.codec.custom.*;

import javax.annotation.Nullable;

import static com.hazelcast.client.impl.protocol.ClientMessage.*;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;

/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
* To change this file, edit the templates or the protocol
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* and regenerate it.
*/

/**
* Deletes the value associated with the key in the specified map.
*/
@SuppressWarnings("unused")
@Generated("41b2cb5de969f159cc1168a65aecbb2b")
public final class CPMapDeleteCodec {
//hex: 0x230500
public static final int REQUEST_MESSAGE_TYPE = 2295040;
//hex: 0x230501
public static final int RESPONSE_MESSAGE_TYPE = 2295041;
private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;

private CPMapDeleteCodec() {
}

@edu.umd.cs.findbugs.annotations.SuppressFBWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"})
public static class RequestParameters {

/**
* CP group ID of this CPMap instance.
*/
public com.hazelcast.cp.internal.RaftGroupId groupId;

/**
* Name of this CPMap instance.
*/
public java.lang.String name;

/**
* Key of the value to delete.
*/
public com.hazelcast.internal.serialization.Data key;
}

public static ClientMessage encodeRequest(com.hazelcast.cp.internal.RaftGroupId groupId, java.lang.String name, com.hazelcast.internal.serialization.Data key) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setContainsSerializedDataInRequest(true);
clientMessage.setRetryable(false);
clientMessage.setOperationName("CPMap.Delete");
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
clientMessage.add(initialFrame);
RaftGroupIdCodec.encode(clientMessage, groupId);
StringCodec.encode(clientMessage, name);
DataCodec.encode(clientMessage, key);
return clientMessage;
}

public static CPMapDeleteCodec.RequestParameters decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
RequestParameters request = new RequestParameters();
//empty initial frame
iterator.next();
request.groupId = RaftGroupIdCodec.decode(iterator);
request.name = StringCodec.decode(iterator);
request.key = DataCodec.decode(iterator);
return request;
}

public static ClientMessage encodeResponse(@Nullable com.hazelcast.internal.serialization.Data response) {
ClientMessage clientMessage = ClientMessage.createForEncode();
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
clientMessage.add(initialFrame);

CodecUtil.encodeNullable(clientMessage, response, DataCodec::encode);
return clientMessage;
}

/**
* Always null, delete does not return any value.
*/
public static com.hazelcast.internal.serialization.Data decodeResponse(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
//empty initial frame
iterator.next();
return CodecUtil.decodeNullable(iterator, DataCodec::decode);
}
}
Loading

0 comments on commit 1fa9b9c

Please sign in to comment.