-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
网络多活架构实现 #374
Labels
question
Further information is requested
Comments
定义:
hk的camellia配置:
eu的camellia配置:
备注: |
香港camellia配置 server:
port: 6380
spring:
application:
name: camellia-redis-proxy-server-hk
camellia-redis-proxy:
console-port: 16379
password: passwd1
plugins:
- xxx.xxx.HKCustomPlugin
transpond:
type: local
local:
type: simple
resource: redis://[email protected]:6381 package com.netease.nim.camellia.redis.proxy.bootstrap;
import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.command.CommandContext;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPlugin;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPluginResponse;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyRequest;
import com.netease.nim.camellia.redis.proxy.plugin.converter.ConverterConfig;
import com.netease.nim.camellia.redis.proxy.plugin.converter.Converters;
import com.netease.nim.camellia.redis.proxy.plugin.converter.KeyConverter;
import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClient;
import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClientTemplateFactory;
import com.netease.nim.camellia.redis.proxy.upstream.RedisProxyEnv;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by caojiajun on 2025/1/21
*/
public class HKCustomPlugin implements ProxyPlugin {
private final Converters converters1;
private final Converters converters2;
private final String multiWriteUrl;
private final ThreadPoolExecutor executor;
public HKCustomPlugin() {
this.multiWriteUrl = ProxyDynamicConf.getString("multi.write.eu.redis", "redis://[email protected]:6381");
{
ConverterConfig converterConfig = new ConverterConfig();
converterConfig.setKeyConverter(new KeyConverter1());
converters1 = new Converters(converterConfig);
}
{
ConverterConfig converterConfig = new ConverterConfig();
converterConfig.setKeyConverter(new KeyConverter2());
converters2 = new Converters(converterConfig);
}
int poolSize = ProxyDynamicConf.getInt("multi.write.executor.pool.size", Runtime.getRuntime().availableProcessors());
int queueSize = ProxyDynamicConf.getInt("multi.write.executor.queue.size", 10000);
this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(queueSize), new DefaultThreadFactory("multi-write"));
}
private static class KeyConverter1 implements KeyConverter {
private static final byte[] prefix1 = "prefix1".getBytes(StandardCharsets.UTF_8);
@Override
public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
//专线1增加前缀prefix1
//自行修改
byte[] key = new byte[prefix1.length + originalKey.length];
System.arraycopy(prefix1, 0, key, 0, prefix1.length);
System.arraycopy(originalKey, 0, key, prefix1.length, originalKey.length);
return key;
}
@Override
public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
return convertedKey;
}
}
private static class KeyConverter2 implements KeyConverter {
private static final byte[] prefix1 = "prefix2".getBytes(StandardCharsets.UTF_8);
@Override
public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
//专线1增加前缀prefix1
//自行修改
byte[] key = new byte[prefix1.length + originalKey.length];
System.arraycopy(prefix1, 0, key, 0, prefix1.length);
System.arraycopy(originalKey, 0, key, prefix1.length, originalKey.length);
return key;
}
@Override
public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
return convertedKey;
}
}
@Override
public ProxyPluginResponse executeRequest(ProxyRequest request) {
try {
Command command = request.getCommand();
RedisCommand redisCommand = command.getRedisCommand();
if (redisCommand == null) {
return ProxyPluginResponse.SUCCESS;
}
//限制性命令不支持
if (redisCommand.getSupportType() != RedisCommand.CommandSupportType.FULL_SUPPORT && redisCommand != RedisCommand.PUBLISH) {
return ProxyPluginResponse.SUCCESS;
}
//只处理写命令
RedisCommand.Type type = redisCommand.getType();
if (type != RedisCommand.Type.WRITE) {
return ProxyPluginResponse.SUCCESS;
}
//阻塞性命令不支持
if (command.isBlocking()) {
return ProxyPluginResponse.SUCCESS;
}
//发送给专线2
sendCommand2(request);
//专线1修改key前缀
converters1.convertRequest(command);
return ProxyPluginResponse.SUCCESS;
} catch (Exception e) {
ErrorLogCollector.collect(HKCustomPlugin.class, "executeRequest error", e);
return ProxyPluginResponse.SUCCESS;
}
}
private void sendCommand2(ProxyRequest request) {
try {
//copy命令给专线2
byte[][] objects = request.getCommand().getObjects();
byte[][] newCmd = new byte[objects.length][];
for (int i=0; i<objects.length; i++) {
newCmd[i] = Arrays.copyOf(objects[i], objects[i].length);
}
//专线2修改key前缀
Command command = new Command(newCmd);
converters2.convertRequest(command);
//异步双写
IUpstreamClientTemplateFactory factory = request.getClientTemplateFactory();
RedisProxyEnv redisProxyEnv = factory.getEnv();
executor.submit(() -> {
try {
IUpstreamClient client = redisProxyEnv.getClientFactory().get(multiWriteUrl);
client.sendCommand(0, Collections.singletonList(command),
Collections.singletonList(new CompletableFuture<>()));
} catch (Exception e) {
ErrorLogCollector.collect(HKCustomPlugin.class, "multi write error, url = " + multiWriteUrl, e);
}
});
} catch (Exception e) {
ErrorLogCollector.collect(HKCustomPlugin.class, "multi write error, url = " + multiWriteUrl, e);
}
}
}
|
赞 |
欧洲camellia配置: server:
port: 6380
spring:
application:
name: camellia-redis-proxy-server-eu
camellia-redis-proxy:
console-port: 16379
password: pass2
plugins:
- xxx.xxx. EuCustomPlugin
transpond:
type: local
local:
type: simple
resource: redis://[email protected]:6379 package com.netease.nim.camellia.redis.proxy.bootstrap;
import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.command.CommandContext;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPlugin;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPluginResponse;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyRequest;
import com.netease.nim.camellia.redis.proxy.plugin.converter.ConverterConfig;
import com.netease.nim.camellia.redis.proxy.plugin.converter.Converters;
import com.netease.nim.camellia.redis.proxy.plugin.converter.KeyConverter;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* Created by caojiajun on 2025/1/21
*/
public class EuCustomPlugin implements ProxyPlugin {
public static final ProxyPluginResponse DEFAULT_FAIL = new ProxyPluginResponse(false, "ERR duplicate");
private final Converters converters1;
private final Converters converters2;
public EuCustomPlugin() {
{
ConverterConfig converterConfig = new ConverterConfig();
converterConfig.setKeyConverter(new KeyConverter1());
converters1 = new Converters(converterConfig);
}
{
ConverterConfig converterConfig = new ConverterConfig();
converterConfig.setKeyConverter(new KeyConverter2());
converters2 = new Converters(converterConfig);
}
}
private static class KeyConverter1 implements KeyConverter {
private static final byte[] prefix1 = "prefix1".getBytes(StandardCharsets.UTF_8);
@Override
public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
//todo 自行修改,反解
return originalKey;
}
@Override
public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
return convertedKey;
}
}
private static class KeyConverter2 implements KeyConverter {
private static final byte[] prefix1 = "prefix2".getBytes(StandardCharsets.UTF_8);
@Override
public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
//todo 自行修改,反解
return originalKey;
}
@Override
public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
return convertedKey;
}
}
@Override
public ProxyPluginResponse executeRequest(ProxyRequest request) {
try {
Command command = request.getCommand();
if (checkPass(command)) {
return ProxyPluginResponse.SUCCESS;
} else {
return DEFAULT_FAIL;
}
} catch (Exception e) {
ErrorLogCollector.collect(EuCustomPlugin.class, "executeRequest error", e);
return ProxyPluginResponse.SUCCESS;
}
}
private boolean checkPass(Command command) {
List<byte[]> keys = command.getKeys();
//todo 去重
boolean line1 = true;
for (byte[] key : keys) {
//todo 根据前缀判断来自专线1还是专线2
}
if (line1) {//
//如果是专线1,则使用converter1反解
converters1.convertRequest(command);
} else {
//如果是专线2,则使用converter2反解
converters2.convertRequest(command);
}
return true;
}
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Module: [e.g. camellia-redis-proxy、camellia-delay-queue]
Version: [e.g. v1.1.12]
Content: [e.g. there are some questions about xxxx ]
想用camellia 双写两条网络专线
部署架构如下。
帮忙看下如何实现?
The text was updated successfully, but these errors were encountered: