diff --git a/cmd/statshouse/statshouse.go b/cmd/statshouse/statshouse.go index 6928412c7..194f32014 100644 --- a/cmd/statshouse/statshouse.go +++ b/cmd/statshouse/statshouse.go @@ -613,6 +613,7 @@ func mainIngressProxy(aesPwd string) { config.Cluster = argv.cluster config.ExternalAddresses = strings.Split(argv.ingressExtAddr, ",") config.ExternalAddressesIPv6 = strings.Split(argv.ingressExtAddrIPv6, ",") + config.Version = argv.ingressVersion // Ensure agent configuration is valid if err := argv.configAgent.ValidateConfigSource(); err != nil { @@ -629,45 +630,24 @@ func mainIngressProxy(aesPwd string) { logErr.Fatalf("error creating Agent instance: %v", err) } sh2.Run(0, 0, 0) - if argv.ingressVersion == "2" { - ctx, cancel := context.WithCancel(context.Background()) - exit := make(chan error, 1) - go func() { - exit <- aggregator.RunIngressProxy2(ctx, sh2, config, aesPwd) - }() - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, syscall.SIGINT) + ctx, cancel := context.WithCancel(context.Background()) + exit := make(chan error, 1) + go func() { + exit <- aggregator.RunIngressProxy2(ctx, sh2, config, aesPwd) + }() + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, syscall.SIGINT) + select { + case <-sigint: + cancel() select { - case <-sigint: - cancel() - select { - case <-exit: - case <-time.After(5 * time.Second): - } - logOk.Println("Buy") - case err := <-exit: - logErr.Println(err) - cancel() + case <-exit: + case <-time.After(5 * time.Second): } - return - } - - // Ensure proxy configuration is valid - if len(config.ExternalAddresses) != 3 { - logErr.Fatalf("--ingress-external-addr must contain exactly 3 comma-separated addresses of ingress proxies, contains '%q'", strings.Join(config.ExternalAddresses, ",")) - } - if len(config.IngressKeys) == 0 { - logErr.Fatalf("ingress proxy must have non-empty list of ingress crypto keys") - } - - // Run ingress proxy - ln, err := rpc.Listen(config.Network, config.ListenAddr, false) - if err != nil { - logErr.Fatalf("Failed to listen on %s %s: %v", config.Network, config.ListenAddr, err) - } - err = aggregator.RunIngressProxy(ln, sh2, aesPwd, config) - if err != nil { - logErr.Fatalf("error running ingress proxy: %v", err) + logOk.Println("Buy") + case err := <-exit: + logErr.Println(err) + cancel() } } diff --git a/internal/aggregator/ingress_proxy.go b/internal/aggregator/ingress_proxy.go index a29906d9b..f5cd83976 100644 --- a/internal/aggregator/ingress_proxy.go +++ b/internal/aggregator/ingress_proxy.go @@ -7,61 +7,15 @@ package aggregator import ( - "context" - "encoding/binary" "encoding/hex" "fmt" "log" - "net" "os" "path/filepath" - "sync" - "time" - "go.uber.org/atomic" - - "github.com/vkcom/statshouse/internal/agent" - "github.com/vkcom/statshouse/internal/data_model" - "github.com/vkcom/statshouse/internal/data_model/gen2/constants" - "github.com/vkcom/statshouse/internal/data_model/gen2/tlstatshouse" - "github.com/vkcom/statshouse/internal/format" - "github.com/vkcom/statshouse/internal/util" - "github.com/vkcom/statshouse/internal/vkgo/basictl" - "github.com/vkcom/statshouse/internal/vkgo/build" "github.com/vkcom/statshouse/internal/vkgo/rpc" ) -type clientPool struct { - aesPwd string - mu sync.RWMutex - // shardReplica -> free clients - clients map[string]*rpc.Client -} - -type longpollClient struct { - queryID int64 - requestLen int -} - -type longpollShard struct { - proxy *IngressProxy - mu sync.Mutex - clientList map[*rpc.HandlerContext]longpollClient -} - -const longPollShardsCount = 256 // we want to shard lock to reduce contention - -type IngressProxy struct { - nextShardLock atomic.Uint64 // round-robin for lock shards must be good - - sh2 *agent.Agent - pool *clientPool - server *rpc.Server - config ConfigIngressProxy - - longpollShards [longPollShardsCount]*longpollShard -} - type ConfigIngressProxy struct { Cluster string Network string @@ -71,14 +25,7 @@ type ConfigIngressProxy struct { ExternalAddressesIPv6 []string IngressKeys []string ResponseMemoryLimit int -} - -func newClientPool(aesPwd string) *clientPool { - cl := &clientPool{ - aesPwd: aesPwd, - clients: map[string]*rpc.Client{}, - } - return cl + Version string } func (config *ConfigIngressProxy) ReadIngressKeys(ingressPwdDir string) error { @@ -102,253 +49,3 @@ func (config *ConfigIngressProxy) ReadIngressKeys(ingressPwdDir string) error { log.Printf("Successfully read %d ingress keys from ingress-pwd-dir %q", len(config.IngressKeys), ingressPwdDir) return nil } - -func RunIngressProxy(ln net.Listener, sh2 *agent.Agent, aesPwd string, config ConfigIngressProxy) error { - // Now we configure our clients using repetition of 3 ingress proxy addresses per shard - extAddr := config.ExternalAddresses - for i := 1; i < len(sh2.GetConfigResult.Addresses)/3; i++ { // GetConfig returns only non-empty list divisible by 3 - config.ExternalAddresses = append(config.ExternalAddresses, extAddr...) - } - proxy := &IngressProxy{ - sh2: sh2, - pool: newClientPool(aesPwd), - // TODO - server settings must be tuned - config: config, - } - for i := 0; i < longPollShardsCount; i++ { - proxy.longpollShards[i] = &longpollShard{ - proxy: proxy, - clientList: map[*rpc.HandlerContext]longpollClient{}, - } - } - metrics := util.NewRPCServerMetrics("statshouse_proxy") - options := []rpc.ServerOptionsFunc{ - rpc.ServerWithCryptoKeys(config.IngressKeys), - rpc.ServerWithHandler(proxy.handler), - rpc.ServerWithSyncHandler(proxy.syncHandler), - rpc.ServerWithForceEncryption(true), - rpc.ServerWithLogf(log.Printf), - rpc.ServerWithDisableContextTimeout(true), - rpc.ServerWithTrustedSubnetGroups(build.TrustedSubnetGroups()), - rpc.ServerWithVersion(build.Info()), - rpc.ServerWithDefaultResponseTimeout(data_model.MaxConveyorDelay * time.Second), - rpc.ServerWithMaxInflightPackets(aggregatorMaxInflightPackets * 100), // enough for up to 100 shards - rpc.ServerWithResponseBufSize(1024), - rpc.ServerWithResponseMemEstimate(1024), - rpc.ServerWithRequestMemoryLimit(8 << 30), // see server settings in aggregator. We do not multiply here - rpc.ServerWithResponseMemoryLimit(config.ResponseMemoryLimit), - metrics.ServerWithMetrics, - } - proxy.server = rpc.NewServer(options...) - defer metrics.Run(proxy.server)() - log.Printf("Running ingress proxy listening %s with %d crypto keys", ln.Addr(), len(config.IngressKeys)) - return proxy.server.Serve(ln) -} - -func keyFromHctx(hctx *rpc.HandlerContext, resultTag int32) (data_model.Key, *format.MetricMetaValue) { - keyID := hctx.KeyID() - keyIDTag := int32(binary.BigEndian.Uint32(keyID[:4])) - protocol := int32(hctx.ProtocolVersion()) - return data_model.Key{ - Metric: format.BuiltinMetricIDRPCRequests, - Tags: [16]int32{0, format.TagValueIDComponentIngressProxy, int32(hctx.RequestTag()), resultTag, 0, 0, keyIDTag, 0, protocol}, - }, format.BuiltinMetricMetaRPCRequests -} - -func (ls *longpollShard) callback(client *rpc.Client, resp *rpc.Response, err error) { - defer client.PutResponse(resp) - userData := resp.UserData() - hctx := userData.(*rpc.HandlerContext) - ls.mu.Lock() - defer ls.mu.Unlock() - lpc, ok := ls.clientList[hctx] - queryID := resp.QueryID() - if !ok || lpc.queryID != queryID { - // server already cancelled longpoll call - // or hctx was cancelled and reused by server before client response arrived - // since we have no client cancellation, we rely on fact that client queryId does not repeat often - return - } - delete(ls.clientList, hctx) - var key data_model.Key - var meta *format.MetricMetaValue - if err != nil { - key, meta = keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrUpstream) - } else { - key, meta = keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusOK) - } - ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, meta) - if resp != nil { - hctx.Response = append(hctx.Response, resp.Body...) - } - hctx.SendHijackedResponse(err) -} - -func (ls *longpollShard) CancelHijack(hctx *rpc.HandlerContext) { - ls.mu.Lock() - defer ls.mu.Unlock() - if lpc, ok := ls.clientList[hctx]; ok { - key, meta := keyFromHctx(hctx, format.TagValueIDRPCRequestsStatusErrCancel) - ls.proxy.sh2.AddValueCounter(key, float64(lpc.requestLen), 1, meta) - } - delete(ls.clientList, hctx) -} - -func (proxy *IngressProxy) syncHandler(ctx context.Context, hctx *rpc.HandlerContext) error { - requestLen := len(hctx.Request) - resultTag, err := proxy.syncHandlerImpl(ctx, hctx) - if resultTag != 0 { - key, meta := keyFromHctx(hctx, resultTag) - proxy.sh2.AddValueCounter(key, float64(requestLen), 1, meta) - } - return err -} - -func (proxy *IngressProxy) syncHandlerImpl(ctx context.Context, hctx *rpc.HandlerContext) (resultTag int32, err error) { - requestLen := len(hctx.Request) - switch hctx.RequestTag() { - case constants.StatshouseGetTagMapping2: - hctx.RequestFunctionName = "statshouse.getTagMapping2" - case constants.StatshouseSendKeepAlive2: - hctx.RequestFunctionName = "statshouse.sendKeepAlive2" - case constants.StatshouseSendSourceBucket2: - hctx.RequestFunctionName = "statshouse.sendSourceBucket2" - case constants.StatshouseTestConnection2: - hctx.RequestFunctionName = "statshouse.testConnection2" - case constants.StatshouseGetTargets2: - hctx.RequestFunctionName = "statshouse.getTargets2" - case constants.StatshouseGetTagMappingBootstrap: - hctx.RequestFunctionName = "statshouse.getTagMappingBootstrap" - case constants.StatshouseGetMetrics3: - hctx.RequestFunctionName = "statshouse.getMetrics3" - case constants.StatshouseAutoCreate: - hctx.RequestFunctionName = "statshouse.autoCreate" - case constants.StatshouseGetConfig2: - hctx.RequestFunctionName = "statshouse.getConfig2" - return 0, rpc.ErrNoHandler // call SyncHandler in worker - default: - // we want fast reject of unknown requests in sync handler - return format.TagValueIDRPCRequestsStatusNoHandler, fmt.Errorf("ingress proxy does not support tag 0x%x", hctx.RequestTag()) - } - req, client, address, err := proxy.fillProxyRequest(hctx) - if err != nil { - return format.TagValueIDRPCRequestsStatusErrLocal, err - } - queryID := req.QueryID() - lockShardID := int(proxy.nextShardLock.Inc() % longPollShardsCount) - ls := proxy.longpollShards[lockShardID] - ls.mu.Lock() // to avoid race with longpoll cancellation, all code below must run under lock - defer ls.mu.Unlock() - if _, err := client.DoCallback(ctx, proxy.config.Network, address, req, ls.callback, hctx); err != nil { - return format.TagValueIDRPCRequestsStatusErrLocal, err - } - ls.clientList[hctx] = longpollClient{queryID: queryID, requestLen: requestLen} - return 0, hctx.HijackResponse(ls) -} - -func (proxy *IngressProxy) handler(ctx context.Context, hctx *rpc.HandlerContext) error { - requestLen := len(hctx.Request) - resultTag, err := proxy.handlerImpl(ctx, hctx) - key, meta := keyFromHctx(hctx, resultTag) - proxy.sh2.AddValueCounter(key, float64(requestLen), 1, meta) - return err -} - -func (proxy *IngressProxy) handlerImpl(ctx context.Context, hctx *rpc.HandlerContext) (resultTag int32, err error) { - switch hctx.RequestTag() { - case constants.StatshouseGetConfig2: - // Record metrics on aggregator with correct host, IP, etc. - // We do not care if it succeeded or not, we make our own response anyway - _, _ = proxy.syncProxyRequest(ctx, hctx) - - var args tlstatshouse.GetConfig2 - var ret tlstatshouse.GetConfigResult - _, err = args.ReadBoxed(hctx.Request) - if err != nil { - return format.TagValueIDRPCRequestsStatusErrLocal, fmt.Errorf("failed to deserialize statshouse.getConfig2 request: %w", err) - } - if args.Cluster != proxy.config.Cluster { - return format.TagValueIDRPCRequestsStatusErrLocal, fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, proxy.config.Cluster) - } - ret.Addresses = proxy.config.ExternalAddresses - ret.MaxAddressesCount = proxy.sh2.GetConfigResult.MaxAddressesCount - ret.PreviousAddresses = proxy.sh2.GetConfigResult.PreviousAddresses - hctx.Response, _ = args.WriteResult(hctx.Response[:0], ret) - return format.TagValueIDRPCRequestsStatusOK, nil - default: - return format.TagValueIDRPCRequestsStatusNoHandler, fmt.Errorf("ingress proxy does not support tag 0x%x", hctx.RequestTag()) - } -} - -func (proxy *IngressProxy) fillProxyRequest(hctx *rpc.HandlerContext) (request *rpc.Request, client *rpc.Client, address string, err error) { - if len(hctx.Request) < 32 { - return nil, nil, "", fmt.Errorf("ingress proxy query with tag 0x%x is too short - %d bytes", hctx.RequestTag(), len(hctx.Request)) - } - addrIPV4, _ := addrIPString(hctx.RemoteAddr()) - - fieldsMask := binary.LittleEndian.Uint32(hctx.Request[4:]) - shardReplica := binary.LittleEndian.Uint32(hctx.Request[8:]) - fieldsMask |= (1 << 31) // args.SetIngressProxy(true) - binary.LittleEndian.PutUint32(hctx.Request[4:], fieldsMask) - binary.LittleEndian.PutUint32(hctx.Request[28:], addrIPV4) // source_ip[3] in header. TODO - ipv6 - // We override this field if set by previous proxy. Because we do not care about agent IPs in their cuber/internal networks - hostName, err := parseHostname(hctx.Request) - if err != nil { - return nil, nil, "", err - } - // Motivation of % len - we pass through badly configured requests for now, so aggregators will record them in builtin metric - shardReplicaIx := shardReplica % uint32(len(proxy.sh2.GetConfigResult.Addresses)) - address = proxy.sh2.GetConfigResult.Addresses[shardReplicaIx] - - client = proxy.pool.getClient(hostName, hctx.RemoteAddr().String()) - req := client.GetRequest() - req.Body = append(req.Body, hctx.Request...) - req.FailIfNoConnection = true - return req, client, address, nil -} - -func (proxy *IngressProxy) syncProxyRequest(ctx context.Context, hctx *rpc.HandlerContext) (resultTag int32, err error) { - req, client, address, err := proxy.fillProxyRequest(hctx) - if err != nil { - return format.TagValueIDRPCRequestsStatusErrLocal, err - } - - resp, err := client.Do(ctx, proxy.config.Network, address, req) - defer client.PutResponse(resp) - if err != nil { - return format.TagValueIDRPCRequestsStatusErrUpstream, err - } - - hctx.Response = append(hctx.Response, resp.Body...) - - return format.TagValueIDRPCRequestsStatusOK, nil -} - -func parseHostname(req []byte) (clientHost string, _ error) { - _, err := basictl.StringRead(req[32:], &clientHost) - return clientHost, err -} - -func (pool *clientPool) getClient(clientHost, remoteAddress string) *rpc.Client { - var client *rpc.Client - pool.mu.RLock() - client = pool.clients[clientHost] - pool.mu.RUnlock() - if client != nil { - return client - } - pool.mu.Lock() - defer pool.mu.Unlock() - client = pool.clients[clientHost] - if client != nil { - return client - } - log.Printf("First connection from agent host: %s, host IP: %s", clientHost, remoteAddress) - client = rpc.NewClient( - rpc.ClientWithProtocolVersion(rpc.LatestProtocolVersion), - rpc.ClientWithLogf(log.Printf), - rpc.ClientWithCryptoKey(pool.aesPwd), - rpc.ClientWithTrustedSubnetGroups(build.TrustedSubnetGroups())) - pool.clients[clientHost] = client - return client -} diff --git a/internal/aggregator/ingress_proxy2.go b/internal/aggregator/ingress_proxy2.go index c0bbb2958..2cf8e63e7 100644 --- a/internal/aggregator/ingress_proxy2.go +++ b/internal/aggregator/ingress_proxy2.go @@ -67,8 +67,8 @@ type ingressProxy2 struct { type proxyServer struct { *ingressProxy2 - config tlstatshouse.GetConfigResult - listener net.Listener + config tlstatshouse.GetConfigResult + listeners []net.Listener } type proxyConn struct { @@ -137,7 +137,7 @@ func RunIngressProxy2(ctx context.Context, agent *agent.Agent, config ConfigIngr tcp6.shutdown() } if len(config.ExternalAddresses) != 0 && config.ExternalAddresses[0] != "" { - err := tcp4.listen("tcp4", config.ListenAddr, config.ExternalAddresses) + err := tcp4.listen("tcp4", config.ListenAddr, config.ExternalAddresses, config.Version) if err != nil { shutdown() return err @@ -146,16 +146,16 @@ func RunIngressProxy2(ctx context.Context, agent *agent.Agent, config ConfigIngr // listen on IPv6 defer tcp6.shutdown() if len(config.ExternalAddressesIPv6) != 0 && config.ExternalAddressesIPv6[0] != "" { - err := tcp6.listen("tcp6", config.ListenAddrIPV6, config.ExternalAddressesIPv6) + err := tcp6.listen("tcp6", config.ListenAddrIPV6, config.ExternalAddressesIPv6, config.Version) if err != nil { shutdown() return err } } - // run - if tcp4.listener == nil && tcp6.listener == nil { + if len(tcp4.listeners) == 0 && len(tcp6.listeners) == 0 { return fmt.Errorf("at least one ingress-external-addr must be provided") } + // run log.Printf("Running ingress proxy v2, PID %d\n", os.Getpid()) tcp4.run() tcp6.run() @@ -197,7 +197,7 @@ func (p *ingressProxy2) newProxyServer() proxyServer { } } -func (p *proxyServer) listen(network, addr string, externalAddr []string) error { +func (p *proxyServer) listen(network, addr string, externalAddr []string, version string) error { if len(p.agent.GetConfigResult.Addresses)%len(externalAddr) != 0 { return fmt.Errorf("number of servers must be multiple of number of ingress-external-addr") } @@ -206,35 +206,60 @@ func (p *proxyServer) listen(network, addr string, externalAddr []string) error if err != nil { return err } - // open ports - log.Printf("Listen addr %v\n", listenAddr) - p.listener, err = rpc.Listen(network, listenAddr.String(), false) - if err != nil { - return err - } - // build external address - n := len(p.agent.GetConfigResult.Addresses) - s := make([]string, 0, n) - for len(s) < n { - for i := 0; i < len(externalAddr) && len(s) < n; i++ { - s = append(s, externalAddr[i]) + // build external address and listen + if version == "2" { + externalTCPAddr := make([]*net.TCPAddr, len(externalAddr)) + for i := range externalAddr { + externalTCPAddr[i], err = net.ResolveTCPAddr(network, externalAddr[i]) + if err != nil { + return err + } } + p.listeners = make([]net.Listener, len(p.agent.GetConfigResult.Addresses)/len(externalAddr)) + for i := range p.listeners { + log.Printf("Listen addr %v\n", listenAddr) + p.listeners[i], err = rpc.Listen(network, listenAddr.String(), false) + if err != nil { + return err + } + listenAddr.Port++ + for j := range externalTCPAddr { + p.config.Addresses = append(p.config.Addresses, externalTCPAddr[j].String()) + externalTCPAddr[j].Port++ + } + } + } else { + log.Printf("Listen addr %v\n", listenAddr) + p.listeners = make([]net.Listener, 1) + p.listeners[0], err = rpc.Listen(network, listenAddr.String(), false) + if err != nil { + return err + } + n := len(p.agent.GetConfigResult.Addresses) + s := make([]string, 0, n) + for len(s) < n { + for i := 0; i < len(externalAddr) && len(s) < n; i++ { + s = append(s, externalAddr[i]) + } + } + p.config.Addresses = s } - p.config.Addresses = s log.Printf("External %s addr %s\n", network, strings.Join(p.config.Addresses, ", ")) return nil } func (p *proxyServer) shutdown() { - if p.listener != nil { - _ = p.listener.Close() + for i := range p.listeners { + if p.listeners[i] != nil { + _ = p.listeners[i].Close() + } } } func (p *proxyServer) run() { - if p.listener != nil { - p.group.Add(1) - go p.serve(p.listener) + p.group.Add(len(p.listeners)) + for i := range p.listeners { + go p.serve(p.listeners[i]) } } diff --git a/internal/api/handler.go b/internal/api/handler.go index 1ad4887e5..a40d3e038 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -644,14 +644,6 @@ func NewHandler(staticDir fs.FS, jsSettings JSSettings, showInvisible bool, chV1 client.Value(format.BuiltinMetricNameApiHeapSys, statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapSys)) client.Value(format.BuiltinMetricNameApiHeapIdle, statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapIdle)) client.Value(format.BuiltinMetricNameApiHeapInuse, statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapInuse)) - //-- TODO: remove when deployed - client.Value("api_vm_size", statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, vmSize) - client.Value("api_vm_rss", statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, vmRSS) - client.Value("api_heap_alloc", statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapAlloc)) - client.Value("api_heap_sys", statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapSys)) - client.Value("api_heap_idle", statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapIdle)) - client.Value("api_heap_inuse", statshouse.Tags{1: srvfunc.HostnameForStatshouse()}, float64(memStats.HeapInuse)) - //-- writeActiveQuieries := func(ch *util.ClickHouse, versionTag string) { if ch != nil {