Skip to content

Commit

Permalink
feature: support keys protocol (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Apr 8, 2024
1 parent bfdcf7a commit a2974b0
Show file tree
Hide file tree
Showing 23 changed files with 301 additions and 99 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# redis2asp
High-performance Aerospike proxy for the Redis protocold

### compatibility

Aerospike: 3.x - 7.x (8.x version has not been tested yet as there is no Docker image available currently)
Redis 3.x - 7.x

### support mode

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)
Expand All @@ -13,7 +18,7 @@ Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/S
| List | | |
| Set | | |
| ZSet | | |
| keys | | |
| keys | done | |



Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<log.version>1.2.9</log.version>
<p3c-pmd.version>1.3.6</p3c-pmd.version>
<maven-pmd-plugin.version>3.8</maven-pmd-plugin.version>
<asp-client.version>4.1.2</asp-client.version>
<asp-client.version>6.3.0</asp-client.version>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import icu.funkye.redispike.handler.process.impl.GetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HDelRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HSetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.KeysRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.CommandRequestProcessor;
import icu.funkye.redispike.handler.process.impl.DelRequestProcessor;
Expand Down Expand Up @@ -54,6 +55,8 @@ public RedisCommandHandler() {
processorMap.put(hDelRequestProcessor.getCmdCode().value(), hDelRequestProcessor);
SetRequestProcessor setRequestProcessor = new SetRequestProcessor();
processorMap.put(setRequestProcessor.getCmdCode().value(), setRequestProcessor);
KeysRequestProcessor keysRequestProcessor = new KeysRequestProcessor();
processorMap.put(keysRequestProcessor.getCmdCode().value(), keysRequestProcessor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ public abstract class AbstractRedisRequestProcessor<T extends RedisRequest<?>> i
@Override
public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService defaultExecutor) throws Exception {
if (defaultExecutor != null) {
defaultExecutor.submit(() -> this.handle(ctx, (T)msg));
defaultExecutor.submit(() -> {
try {
this.handle(ctx, (T)msg);
} catch (Exception e) {
logger.error("process error: {}",e.getMessage(), e);
}
});
} else {
this.handle(ctx, (T)msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
Expand All @@ -31,7 +30,7 @@ public CommandRequestProcessor() {

@Override
public void handle(RemotingContext ctx, CommandRequest request) {
request.setResponse("OK".getBytes(StandardCharsets.UTF_8));
request.setResponse("OK");
ctx.writeAndFlush(request.getResponse());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,18 +37,18 @@ public DelRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(DelRequest.class.hashCode()));
}

@Override public void handle(RemotingContext ctx, DelRequest request) {
@Override
public void handle(RemotingContext ctx, DelRequest request) {
List<String> keys = request.getKey();
List<Key> list =
keys.stream().map(key -> new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, key))
.collect(Collectors.toList());
keys.stream().map(key -> new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, key))
.collect(Collectors.toList());
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Key key : list) {
client.delete(AeroSpikeClientFactory.eventLoops.next(), new DeleteListener() {
@Override
public void onSuccess(Key key, boolean b) {
request.setResponse(String.valueOf(request.getCount().incrementAndGet())
.getBytes(StandardCharsets.UTF_8));
request.setResponse(String.valueOf(request.getCount().incrementAndGet()));
countDownLatch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
Expand Down Expand Up @@ -47,7 +46,7 @@ public void onSuccess(Key key, Record record) {
}
String value = record.getString(request.getKey());
if (StringUtil.isNotBlank(value)) {
request.setResponse(value.getBytes(StandardCharsets.UTF_8));
request.setResponse(value);
}
ctx.writeAndFlush(request.getResponse());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -54,7 +53,7 @@ public void onSuccess(Key key, Record record) {
@Override
public void onSuccess(Key key, boolean b) {
request.setResponse(
String.valueOf(request.getFields().size()).getBytes(StandardCharsets.UTF_8));
String.valueOf(request.getFields().size()));
ctx.writeAndFlush(request.getResponse());
}

Expand All @@ -71,7 +70,7 @@ public void onFailure(AerospikeException exception) {
@Override
public void onSuccess(Key key) {
request.setResponse(
String.valueOf(request.getFields().size()).getBytes(StandardCharsets.UTF_8));
String.valueOf(request.getFields().size()));
ctx.writeAndFlush(request.getResponse());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import com.aerospike.client.AerospikeException;
Expand Down Expand Up @@ -53,7 +52,7 @@ public void handle(RemotingContext ctx, HSetRequest request) {
client.put(AeroSpikeClientFactory.eventLoops.next(), new WriteListener() {
@Override
public void onSuccess(Key key) {
request.setResponse(String.valueOf(request.getKv().size()).getBytes(StandardCharsets.UTF_8));
request.setResponse(String.valueOf(request.getKv().size()));
ctx.writeAndFlush(request.getResponse());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordSequenceListener;
import com.aerospike.client.policy.ScanPolicy;
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.util.StringUtils;

import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.KeysRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class KeysRequestProcessor extends AbstractRedisRequestProcessor<KeysRequest> {
ScanPolicy scanPolicy = new ScanPolicy(client.getScanPolicyDefault());

public KeysRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(KeysRequest.class.hashCode()));
this.scanPolicy.includeBinData = false;
}

@Override
public void handle(RemotingContext ctx, KeysRequest request) {

boolean all = StringUtils.equals(request.getPattern(), "*");
boolean left = request.getPattern().startsWith("*");
if (left) {
request.setPattern(request.getPattern().substring(1));
}
boolean right = request.getPattern().endsWith("*");
if (right) {
request.setPattern(request.getPattern().substring(0, request.getPattern().length() - 1));
}
client.scanAll(AeroSpikeClientFactory.eventLoops.next(), new RecordSequenceListener() {
@Override
public void onRecord(Key key, Record record) throws AerospikeException {
if (key != null) {
if (key.userKey != null) {
String userKey = key.userKey.toString();
if (all) {
request.setResponse(userKey);
} else if (left) {
if (right) {
if (userKey.contains(request.getPattern())) {
request.setResponse(userKey);
}
} else if (userKey.endsWith(request.getPattern())) {
request.setResponse(userKey);
}
} else if (right) {
if (userKey.startsWith(request.getPattern())) {
request.setResponse(userKey);
}
} else {
if (StringUtils.equals(userKey, request.getPattern())) {
request.setResponse(userKey);
}
}
}
}
}

@Override
public void onSuccess() {
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
}
}, scanPolicy, AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
Expand All @@ -35,29 +34,30 @@
import icu.funkye.redispike.util.IntegerUtils;

public class SetRequestProcessor extends AbstractRedisRequestProcessor<SetRequest> {
WritePolicy defaultWritePolicy;

public SetRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(SetRequest.class.hashCode()));
this.defaultWritePolicy = new WritePolicy(client.getWritePolicyDefault());
this.defaultWritePolicy.sendKey = true;
}

@Override
public void handle(RemotingContext ctx, SetRequest request) {
Bin bin = new Bin(request.getKey(), request.getValue());
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
WritePolicy writePolicy = null;
WritePolicy writePolicy = this.defaultWritePolicy;
if (request.getTtl() != null) {
writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy = new WritePolicy(writePolicy);
if (request.getTtlType() == TtlType.EX) {
writePolicy.expiration = request.getTtl().intValue();
} else {
writePolicy.expiration = Integer.max((int) (request.getTtl() / 1000), 1);
}
}
if (request.getOperate() != null) {
if (writePolicy == null) {
writePolicy = new WritePolicy(client.getWritePolicyDefault());
}
if (request.getOperate() == Operate.NX) {
writePolicy = new WritePolicy(writePolicy);
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
} else if (request.getOperate() == Operate.XX) {
client.get(AeroSpikeClientFactory.eventLoops.next(), new RecordListener() {
Expand All @@ -69,7 +69,7 @@ public void onSuccess(Key key, Record record) {
client.put(AeroSpikeClientFactory.eventLoops.next(), new WriteListener() {
@Override
public void onSuccess(Key key) {
request.setResponse("OK".getBytes(StandardCharsets.UTF_8));
request.setResponse("OK");
ctx.writeAndFlush(request.getResponse());
}

Expand All @@ -91,16 +91,13 @@ public void onFailure(AerospikeException ae) {
return;
}
}
if (writePolicy == null) {
writePolicy = client.getWritePolicyDefault();
}
client.put(AeroSpikeClientFactory.eventLoops.next(), new WriteListener() {
@Override
public void onSuccess(Key key) {
if (request.getOriginalCommand().contains("nx")) {
request.setResponse("1".getBytes(StandardCharsets.UTF_8));
request.setResponse("1");
} else {
request.setResponse("OK".getBytes(StandardCharsets.UTF_8));
request.setResponse("OK");
}
ctx.writeAndFlush(request.getResponse());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.remoting.CommandDecoder;
import icu.funkye.redispike.protocol.request.HDelRequest;
import icu.funkye.redispike.protocol.request.HSetRequest;
import icu.funkye.redispike.protocol.request.KeysRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
Expand Down Expand Up @@ -73,6 +74,8 @@ private RedisRequest<?> convert2RedisRequest(List<String> params) {
return new SetRequest(params);
case "set":
return new SetRequest(params);
case "keys":
return new KeysRequest(params);
case "del":
params.remove(0);
return new DelRequest(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public interface RedisResponse<T> extends RemotingCommand {

byte[] CRLF = new byte[] {'\r', '\n'};

T data();
void setData(byte[] data);
void setData(T data);
void write(ByteBuf out) throws IOException;

default ProtocolCode getProtocolCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
import icu.funkye.redispike.protocol.RedisResponse;
import icu.funkye.redispike.protocol.response.BulkResponse;

public class CommandRequest implements RedisRequest<byte[]> {
public class CommandRequest implements RedisRequest<String> {

private BulkResponse response = new BulkResponse();
private final BulkResponse response = new BulkResponse();

@Override
public RedisResponse<byte[]> getResponse() {
public RedisResponse<String> getResponse() {
return response;
}

@Override
public void setResponse(byte[] data) {
public void setResponse(String data) {
response.setData(data);
}

Expand Down
Loading

0 comments on commit a2974b0

Please sign in to comment.