Skip to content

Commit

Permalink
refactor: refactor code (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Apr 7, 2024
1 parent 8f3588c commit f7b98c0
Show file tree
Hide file tree
Showing 15 changed files with 787 additions and 296 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# redis2asp
High-performance Aerospike proxy for the Redis protocol
High-performance Aerospike proxy for the Redis protocold

### support mode

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)

| feature | note |
| ------- | -------------------------------------------------------- |
| String | Perfect support |
| Hash | HSETNX only supports the key level, not the column level |
| List | Not support |
| pub/sub | Ready for support |
| Set | Ready for support |
| ZSet | |



### Performance Test Report
aerospike 3.x 2c4g redispike-proxy 2c4g:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.CommandHandler;
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.RemotingProcessor;
import icu.funkye.redispike.handler.process.impl.GetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HDelRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HSetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.CommandRequestProcessor;
import icu.funkye.redispike.handler.process.impl.DelRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequest;
import icu.funkye.redispike.protocol.response.BulkResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisCommandHandler implements CommandHandler {

private final Logger logger = LoggerFactory.getLogger(getClass());

Map<Short, RemotingProcessor<RedisRequest>> processorMap = new HashMap<>();

public RedisCommandHandler() {
CommandRequestProcessor commandRequestProcessor = new CommandRequestProcessor();
processorMap.put(commandRequestProcessor.getCmdCode().value(), commandRequestProcessor);
DelRequestProcessor delRequestProcessor = new DelRequestProcessor();
processorMap.put(delRequestProcessor.getCmdCode().value(), delRequestProcessor);
GetRequestProcessor getRequestProcessor = new GetRequestProcessor();
processorMap.put(getRequestProcessor.getCmdCode().value(), getRequestProcessor);
HSetRequestProcessor hSetRequestProcessor = new HSetRequestProcessor();
processorMap.put(hSetRequestProcessor.getCmdCode().value(), hSetRequestProcessor);
HDelRequestProcessor hDelRequestProcessor = new HDelRequestProcessor();
processorMap.put(hDelRequestProcessor.getCmdCode().value(), hDelRequestProcessor);
SetRequestProcessor setRequestProcessor = new SetRequestProcessor();
processorMap.put(setRequestProcessor.getCmdCode().value(), setRequestProcessor);
}

@Override
public void handleCommand(RemotingContext ctx, Object msg) {
if (msg instanceof RedisRequest) {
RedisRequest request = (RedisRequest) msg;
try {
processorMap.get(request.getCmdCode().value()).process(ctx, request, getDefaultExecutor());
} catch (Exception e) {
logger.error(e.getMessage(), e);
ctx.writeAndFlush(new BulkResponse());
}
}
}

@Override
public void registerProcessor(CommandCode cmd, RemotingProcessor processor) {
processorMap.put(cmd.value(), processor);
}

@Override
public void registerDefaultExecutor(ExecutorService executor) {
}

@Override
public ExecutorService getDefaultExecutor() {
return ForkJoinPool.commonPool();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process;

import java.util.concurrent.ExecutorService;
import com.aerospike.client.IAerospikeClient;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.RemotingCommand;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.protocol.RedisRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRedisRequestProcessor<T extends RedisRequest<?>> implements RedisRequestProcessor<T> {

protected final IAerospikeClient client = AeroSpikeClientFactory.getClient();

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected CommandCode cmdCode;

@Override
public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService defaultExecutor) throws Exception {
if (defaultExecutor != null) {
defaultExecutor.submit(() -> this.handle(ctx, (T)msg));
} else {
this.handle(ctx, (T)msg);
}
}

@Override
public CommandCode getCmdCode() {
return this.cmdCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process;

import java.util.concurrent.ExecutorService;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.RemotingProcessor;
import icu.funkye.redispike.protocol.RedisRequest;

public interface RedisRequestProcessor<T extends RedisRequest<?>> extends RemotingProcessor {

void handle(RemotingContext ctx, T request);

CommandCode getCmdCode();

@Override
default ExecutorService getExecutor() {
return null;
}

@Override
default void setExecutor(ExecutorService executor) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.CommandRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class CommandRequestProcessor extends AbstractRedisRequestProcessor<CommandRequest> {

public CommandRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(CommandRequest.class.hashCode()));
}

@Override
public void handle(RemotingContext ctx, CommandRequest request) {
request.setResponse("OK".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(request.getResponse());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.listener.DeleteListener;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.DelRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class DelRequestProcessor extends AbstractRedisRequestProcessor<DelRequest> {

public DelRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(DelRequest.class.hashCode()));
}

@Override public void handle(RemotingContext ctx, DelRequest request) {
List<String> keys = request.getKey();
List<Key> list =
keys.stream().map(key -> new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, key))
.collect(Collectors.toList());
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Key key : list) {
client.delete(AeroSpikeClientFactory.eventLoops.next(), new DeleteListener() {
@Override
public void onSuccess(Key key, boolean b) {
request.setResponse(String.valueOf(request.getCount().incrementAndGet())
.getBytes(StandardCharsets.UTF_8));
countDownLatch.countDown();
}

@Override
public void onFailure(AerospikeException e) {
countDownLatch.countDown();
}
}, client.getWritePolicyDefault(), key);
}
CompletableFuture.runAsync(() -> {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
ctx.writeAndFlush(request.getResponse());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
ctx.writeAndFlush(request.getResponse());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordListener;
import com.alipay.remoting.RemotingContext;
import com.alipay.sofa.common.profile.StringUtil;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.GetRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class GetRequestProcessor extends AbstractRedisRequestProcessor<GetRequest> {

public GetRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(GetRequest.class.hashCode()));
}

@Override
public void handle(RemotingContext ctx, GetRequest request) {
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
client.get(AeroSpikeClientFactory.eventLoops.next(), new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
return;
}
String value = record.getString(request.getKey());
if (StringUtil.isNotBlank(value)) {
request.setResponse(value.getBytes(StandardCharsets.UTF_8));
}
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
}
}, client.getReadPolicyDefault(), key);
}
}
Loading

0 comments on commit f7b98c0

Please sign in to comment.