diff --git a/internal/util/chutil.go b/internal/util/chutil.go index 96508c3b4..dc84b152b 100644 --- a/internal/util/chutil.go +++ b/internal/util/chutil.go @@ -163,6 +163,7 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q kind := QueryKind(meta.IsFast, meta.IsLight) pool := ch.pools[kind] servers := append(make([]*chpool.Pool, 0, len(pool.servers)), pool.servers...) + // TODO add metric for safetyCounter := 0; safetyCounter < len(pool.servers); safetyCounter++ { var i int i, err = pickRandomServer(servers, pool.rnd) @@ -177,39 +178,49 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q statshouse.Metric("statshouse_unique_wait_test", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: "uniq"}).Value(float64(uniqWait)) statshouse.Metric("statshouse_unique_wait_test", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: "all"}).Value(float64(allWait)) pool.waitMx.Unlock() - err = pool.sem.Acquire(ctx, meta.User) - waitLockDuration := time.Since(startTime) - pool.waitMx.Lock() - pool.userWait[meta.User]-- - if c := pool.userWait[meta.User]; c == 0 { - delete(pool.userWait, meta.User) - } - pool.waitMx.Unlock() - statshouse.Metric("statshouse_wait_lock", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: meta.User}).Value(waitLockDuration.Seconds()) - if err != nil { - return info, err - } - pool.mx.Lock() - pool.userActive[meta.User]++ - uniq := len(pool.userActive) - all := pool.countOfReqLocked(pool.userActive) - pool.mx.Unlock() - statshouse.Metric("statshouse_unique_test", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: "uniq"}).Value(float64(uniq)) - statshouse.Metric("statshouse_unique_test", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: "all"}).Value(float64(all)) - - start := time.Now() - err = servers[i].Do(ctx, query) - info.Duration = time.Since(start) - pool.mx.Lock() - pool.userActive[meta.User]-- - if c := pool.userActive[meta.User]; c == 0 { - delete(pool.userActive, meta.User) - } - pool.mx.Unlock() - pool.sem.Release() + mustRetry := false + err = func() error { + err := pool.sem.Acquire(ctx, meta.User) + waitLockDuration := time.Since(startTime) + pool.waitMx.Lock() + pool.userWait[meta.User]-- + if c := pool.userWait[meta.User]; c == 0 { + delete(pool.userWait, meta.User) + } + pool.waitMx.Unlock() + statshouse.Metric("statshouse_wait_lock", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: meta.User}).Value(waitLockDuration.Seconds()) + if err != nil { + return err + } + defer pool.sem.Release() + pool.mx.Lock() + pool.userActive[meta.User]++ + uniq := len(pool.userActive) + all := pool.countOfReqLocked(pool.userActive) + pool.mx.Unlock() + statshouse.Metric("statshouse_unique_test", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: "uniq"}).Value(float64(uniq)) + statshouse.Metric("statshouse_unique_test", statshouse.Tags{1: strconv.FormatInt(int64(kind), 10), 2: "all"}).Value(float64(all)) + + start := time.Now() + err = servers[i].Do(ctx, query) + info.Duration = time.Since(start) + pool.mx.Lock() + pool.userActive[meta.User]-- + if c := pool.userActive[meta.User]; c == 0 { + delete(pool.userActive, meta.User) + } + pool.mx.Unlock() + if err != nil { + mustRetry = true + } + return err + }() if err == nil { return // succeeded } + if err != nil && !mustRetry { + return info, err + } if ctx.Err() != nil { return // failed }