Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

网络多活架构实现 #374

Open
Memorydoc opened this issue Jan 21, 2025 · 4 comments
Open

网络多活架构实现 #374

Memorydoc opened this issue Jan 21, 2025 · 4 comments
Labels
question Further information is requested

Comments

@Memorydoc
Copy link

Module: [e.g. camellia-redis-proxy、camellia-delay-queue]
Version: [e.g. v1.1.12]
Content: [e.g. there are some questions about xxxx ]

想用camellia 双写两条网络专线
部署架构如下。

Image

帮忙看下如何实现?

@Memorydoc Memorydoc added the question Further information is requested label Jan 21, 2025
@caojiajun
Copy link
Collaborator

caojiajun commented Jan 21, 2025

定义:

  1. eu-redis为redis://[email protected]:6379
  2. eu的camellia有2个vip,分别为10.0.0.1和10.0.0.2,代表了2条不同的专线

hk的camellia配置:

  1. 对外暴露1个端口1个密码,假设为6380和passwd1
  2. 后端配置redis://[email protected]:6381,表示默认走专线1回eu
  3. 配置自定义plugin,该plugin功能为:
  1. 对redis://[email protected]:6381后端增加前缀prefix1
  2. 双写到redis://[email protected]:6381,并且增加前缀prefix2

eu的camellia配置:

  1. 对外暴露1个端口1个密码,假设为6381和passwd2
  2. 增加一个自定义plugin,该plugin功能为:
  1. 根据key的前缀,判断来自专线1还是专线2
  2. 解析命令中的key,做去重逻辑,去重后的重复命令直接返回失败
  3. 反解key前缀,恢复回原始key

备注:
hk业务在调用hk-camellia时,需要忽略error回包

@caojiajun
Copy link
Collaborator

caojiajun commented Jan 21, 2025

香港camellia配置

server:
  port: 6380
spring:
  application:
    name: camellia-redis-proxy-server-hk

camellia-redis-proxy:
  console-port: 16379
  password: passwd1
  plugins:
    - xxx.xxx.HKCustomPlugin
  transpond:
    type: local
    local:
      type: simple
      resource: redis://[email protected]:6381
package com.netease.nim.camellia.redis.proxy.bootstrap;

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.command.CommandContext;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPlugin;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPluginResponse;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyRequest;
import com.netease.nim.camellia.redis.proxy.plugin.converter.ConverterConfig;
import com.netease.nim.camellia.redis.proxy.plugin.converter.Converters;
import com.netease.nim.camellia.redis.proxy.plugin.converter.KeyConverter;
import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClient;
import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClientTemplateFactory;
import com.netease.nim.camellia.redis.proxy.upstream.RedisProxyEnv;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by caojiajun on 2025/1/21
 */
public class HKCustomPlugin implements ProxyPlugin {

    private final Converters converters1;
    private final Converters converters2;
    private final String multiWriteUrl;

    private final ThreadPoolExecutor executor;

    public HKCustomPlugin() {
        this.multiWriteUrl = ProxyDynamicConf.getString("multi.write.eu.redis", "redis://[email protected]:6381");
        {
            ConverterConfig converterConfig = new ConverterConfig();
            converterConfig.setKeyConverter(new KeyConverter1());
            converters1 = new Converters(converterConfig);
        }
        {
            ConverterConfig converterConfig = new ConverterConfig();
            converterConfig.setKeyConverter(new KeyConverter2());
            converters2 = new Converters(converterConfig);
        }
        int poolSize = ProxyDynamicConf.getInt("multi.write.executor.pool.size", Runtime.getRuntime().availableProcessors());
        int queueSize = ProxyDynamicConf.getInt("multi.write.executor.queue.size", 10000);
        this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(queueSize), new DefaultThreadFactory("multi-write"));
    }

    private static class KeyConverter1 implements KeyConverter {

        private static final byte[] prefix1 = "prefix1".getBytes(StandardCharsets.UTF_8);

        @Override
        public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
            //专线1增加前缀prefix1
            //自行修改
            byte[] key = new byte[prefix1.length + originalKey.length];
            System.arraycopy(prefix1, 0, key, 0, prefix1.length);
            System.arraycopy(originalKey, 0, key, prefix1.length, originalKey.length);
            return key;
        }

        @Override
        public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
            return convertedKey;
        }
    }

    private static class KeyConverter2 implements KeyConverter {

        private static final byte[] prefix1 = "prefix2".getBytes(StandardCharsets.UTF_8);

        @Override
        public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
            //专线1增加前缀prefix1
            //自行修改
            byte[] key = new byte[prefix1.length + originalKey.length];
            System.arraycopy(prefix1, 0, key, 0, prefix1.length);
            System.arraycopy(originalKey, 0, key, prefix1.length, originalKey.length);
            return key;
        }

        @Override
        public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
            return convertedKey;
        }
    }


    @Override
    public ProxyPluginResponse executeRequest(ProxyRequest request) {
        try {
            Command command = request.getCommand();
            RedisCommand redisCommand = command.getRedisCommand();
            if (redisCommand == null) {
                return ProxyPluginResponse.SUCCESS;
            }
            //限制性命令不支持
            if (redisCommand.getSupportType() != RedisCommand.CommandSupportType.FULL_SUPPORT && redisCommand != RedisCommand.PUBLISH) {
                return ProxyPluginResponse.SUCCESS;
            }
            //只处理写命令
            RedisCommand.Type type = redisCommand.getType();
            if (type != RedisCommand.Type.WRITE) {
                return ProxyPluginResponse.SUCCESS;
            }
            //阻塞性命令不支持
            if (command.isBlocking()) {
                return ProxyPluginResponse.SUCCESS;
            }

            //发送给专线2
            sendCommand2(request);
            //专线1修改key前缀
            converters1.convertRequest(command);
            return ProxyPluginResponse.SUCCESS;
        } catch (Exception e) {
            ErrorLogCollector.collect(HKCustomPlugin.class, "executeRequest error", e);
            return ProxyPluginResponse.SUCCESS;
        }
    }

    private void sendCommand2(ProxyRequest request) {
        try {
            //copy命令给专线2
            byte[][] objects = request.getCommand().getObjects();
            byte[][] newCmd = new byte[objects.length][];
            for (int i=0; i<objects.length; i++) {
                newCmd[i] = Arrays.copyOf(objects[i], objects[i].length);
            }
            //专线2修改key前缀
            Command command = new Command(newCmd);
            converters2.convertRequest(command);
            //异步双写
            IUpstreamClientTemplateFactory factory = request.getClientTemplateFactory();
            RedisProxyEnv redisProxyEnv = factory.getEnv();
            executor.submit(() -> {
                try {
                    IUpstreamClient client = redisProxyEnv.getClientFactory().get(multiWriteUrl);
                    client.sendCommand(0, Collections.singletonList(command),
                            Collections.singletonList(new CompletableFuture<>()));
                } catch (Exception e) {
                    ErrorLogCollector.collect(HKCustomPlugin.class, "multi write error, url = " + multiWriteUrl, e);
                }
            });
        } catch (Exception e) {
            ErrorLogCollector.collect(HKCustomPlugin.class, "multi write error, url = " + multiWriteUrl, e);
        }
    }
}

@Memorydoc
Copy link
Author

@caojiajun
Copy link
Collaborator

caojiajun commented Jan 21, 2025

欧洲camellia配置:

server:
  port: 6380
spring:
  application:
    name: camellia-redis-proxy-server-eu

camellia-redis-proxy:
  console-port: 16379
  password: pass2
  plugins:
    - xxx.xxx. EuCustomPlugin
  transpond:
    type: local
    local:
      type: simple
      resource: redis://[email protected]:6379
package com.netease.nim.camellia.redis.proxy.bootstrap;

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.command.CommandContext;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPlugin;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyPluginResponse;
import com.netease.nim.camellia.redis.proxy.plugin.ProxyRequest;
import com.netease.nim.camellia.redis.proxy.plugin.converter.ConverterConfig;
import com.netease.nim.camellia.redis.proxy.plugin.converter.Converters;
import com.netease.nim.camellia.redis.proxy.plugin.converter.KeyConverter;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * Created by caojiajun on 2025/1/21
 */
public class EuCustomPlugin implements ProxyPlugin {

    public static final ProxyPluginResponse DEFAULT_FAIL = new ProxyPluginResponse(false, "ERR duplicate");

    private final Converters converters1;
    private final Converters converters2;

    public EuCustomPlugin() {
        {
            ConverterConfig converterConfig = new ConverterConfig();
            converterConfig.setKeyConverter(new KeyConverter1());
            converters1 = new Converters(converterConfig);
        }
        {
            ConverterConfig converterConfig = new ConverterConfig();
            converterConfig.setKeyConverter(new KeyConverter2());
            converters2 = new Converters(converterConfig);
        }
    }

    private static class KeyConverter1 implements KeyConverter {

        private static final byte[] prefix1 = "prefix1".getBytes(StandardCharsets.UTF_8);

        @Override
        public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
            //todo 自行修改,反解
            return originalKey;
        }

        @Override
        public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
            return convertedKey;
        }
    }

    private static class KeyConverter2 implements KeyConverter {

        private static final byte[] prefix1 = "prefix2".getBytes(StandardCharsets.UTF_8);

        @Override
        public byte[] convert(CommandContext commandContext, RedisCommand redisCommand, byte[] originalKey) {
            //todo 自行修改,反解
            return originalKey;
        }

        @Override
        public byte[] reverseConvert(CommandContext commandContext, RedisCommand redisCommand, byte[] convertedKey) {
            return convertedKey;
        }
    }

    @Override
    public ProxyPluginResponse executeRequest(ProxyRequest request) {
        try {
            Command command = request.getCommand();
            if (checkPass(command)) {
                return ProxyPluginResponse.SUCCESS;
            } else {
                return DEFAULT_FAIL;
            }
        } catch (Exception e) {
            ErrorLogCollector.collect(EuCustomPlugin.class, "executeRequest error", e);
            return ProxyPluginResponse.SUCCESS;
        }
    }

    private boolean checkPass(Command command) {
        List<byte[]> keys = command.getKeys();
        //todo 去重
        boolean line1 = true;
        for (byte[] key : keys) {
            //todo 根据前缀判断来自专线1还是专线2    
        }
        if (line1) {//
            //如果是专线1,则使用converter1反解
            converters1.convertRequest(command);
        } else {
            //如果是专线2,则使用converter2反解
            converters2.convertRequest(command);
        }
        return true;
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants