diff --git a/agent/agents/mysql/perfschema/perfschema.go b/agent/agents/mysql/perfschema/perfschema.go index 2f914fe692..ae4af57dd3 100644 --- a/agent/agents/mysql/perfschema/perfschema.go +++ b/agent/agents/mysql/perfschema/perfschema.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "math" - "sync" "time" "github.com/AlekSi/pointer" // register SQL driver @@ -46,18 +45,6 @@ type ( summaryMap map[string]*eventsStatementsSummaryByDigest ) -// mySQLVersion contains. -type mySQLVersion struct { - version float64 - vendor string -} - -// versionsCache provides cached access to MySQL version. -type versionsCache struct { - rw sync.RWMutex - items map[string]*mySQLVersion -} - const ( retainHistory = 5 * time.Minute refreshHistory = 5 * time.Second diff --git a/agent/client/cache/cache.go b/agent/client/cache/cache.go deleted file mode 100644 index abd5805665..0000000000 --- a/agent/client/cache/cache.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2023 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package cache incapsulates agent message storing logic. -package cache - -import ( - "path" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - - "github.com/percona/pmm/agent/config" - "github.com/percona/pmm/agent/models" - "github.com/percona/pmm/agent/utils/buffer-ring/bigqueue" - "github.com/percona/pmm/api/agentpb" -) - -// Cache represent cache implementation based on bigqueue. -type Cache struct { - l *logrus.Entry - // prioritized represent cache for high priority agent messages e.g. job, action results - prioritized *bigqueue.Ring - // unprioritized represent cache for low priority agent messages e.g. qan metrics - unprioritized *bigqueue.Ring -} - -// New recreates cache. -func New(cfg config.Cache) (*Cache, error) { - if cfg.Disable { - return nil, errors.New("disable in cache config is set to true") - } - if cfg.Dir == "" { - return nil, errors.New("cache directory is not set up") - } - l := logrus.WithField("component", "cache") - prioritized, err := bigqueue.New(path.Join(cfg.Dir, "prioritized"), cfg.PrioritizedSize, l.WithField("type", "prioritized")) - if err != nil { - return nil, err - } - unprioritized, err := bigqueue.New(path.Join(cfg.Dir, "unprioritized"), cfg.UnprioritizedSize, l.WithField("type", "unprioritized")) - if err != nil { - return nil, err - } - return &Cache{ - l: l, - prioritized: prioritized, - unprioritized: unprioritized, - }, nil -} - -// Send stores agent response to cache on nil channel. -func (c *Cache) Send(resp *models.AgentResponse) error { - var cache *bigqueue.Ring - switch resp.Payload.(type) { - case *agentpb.StartActionResponse, - *agentpb.StopActionResponse, - *agentpb.PBMSwitchPITRResponse, - *agentpb.StartJobResponse, - *agentpb.JobStatusResponse, - *agentpb.GetVersionsResponse, - *agentpb.JobProgress, - *agentpb.StopJobResponse, - *agentpb.CheckConnectionResponse, - *agentpb.JobResult, - *agentpb.ServiceInfoResponse: - cache = c.prioritized - default: - cache = c.unprioritized - } - return cache.Send(resp) -} - -// SendAndWaitResponse stores AgentMessages with AgentMessageRequestPayload on nil channel. -func (c *Cache) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb.ServerResponsePayload, error) { //nolint:ireturn - switch payload.(type) { - case *agentpb.ActionResultRequest: - return c.prioritized.SendAndWaitResponse(payload) - case *agentpb.QANCollectRequest, - *agentpb.StateChangedRequest: - return c.unprioritized.SendAndWaitResponse(payload) - default: - } - return &agentpb.StateChangedResponse{}, nil -} - -// Close closes cache databases. -func (c *Cache) Close() { - c.prioritized.Close() - c.unprioritized.Close() -} - -// SetSender sets sender and sends stored agent messages with sender. -func (c *Cache) SetSender(s models.Sender) { - c.prioritized.SetSender(s) - c.unprioritized.SetSender(s) -} diff --git a/agent/client/cache/dummy.go b/agent/client/cache/dummy.go deleted file mode 100644 index 7381b8e960..0000000000 --- a/agent/client/cache/dummy.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2023 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "sync/atomic" - - "github.com/pkg/errors" - - "github.com/percona/pmm/agent/models" - agenterrors "github.com/percona/pmm/agent/utils/errors" - "github.com/percona/pmm/api/agentpb" -) - -// Dummy represent dummy cache. -type Dummy struct { - s atomic.Pointer[models.Sender] -} - -// Close to satisfy interface. -func (*Dummy) Close() {} - -// Send drops agent responses on nil channel. -func (c *Dummy) Send(resp *models.AgentResponse) error { - s := c.s.Load() - if s == nil { - return nil - } - err := (*s).Send(resp) - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - c.s.CompareAndSwap(s, nil) - } - return err -} - -// SendAndWaitResponse drops AgentMessages on nil channel. -func (c *Dummy) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb.ServerResponsePayload, error) { //nolint:ireturn - s := c.s.Load() - if s == nil { - return &agentpb.StateChangedResponse{}, nil - } - resp, err := (*s).SendAndWaitResponse(payload) - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - c.s.CompareAndSwap(s, nil) - } - return resp, err -} - -// SetSender sets sender. -func (c *Dummy) SetSender(s models.Sender) { - c.s.Store(&s) -} diff --git a/agent/client/channel/channel.go b/agent/client/channel/channel.go index c022681540..3d76c5a697 100644 --- a/agent/client/channel/channel.go +++ b/agent/client/channel/channel.go @@ -28,8 +28,6 @@ import ( "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" - "github.com/percona/pmm/agent/models" - agenterrors "github.com/percona/pmm/agent/utils/errors" "github.com/percona/pmm/api/agentpb" ) @@ -48,6 +46,15 @@ type ServerRequest struct { Payload agentpb.ServerRequestPayload } +// AgentResponse represents agent's response. +// It is similar to agentpb.AgentMessage except it can contain only responses, +// and the payload is already unwrapped (XXX instead of AgentMessage_XXX). +type AgentResponse struct { + ID uint32 + Status *grpcstatus.Status + Payload agentpb.AgentResponsePayload +} + // Response is a type used to pass response from pmm-server to the subscriber. type Response struct { Payload agentpb.ServerResponsePayload @@ -111,7 +118,7 @@ func New(stream agentpb.Agent_ConnectClient) *Channel { func (c *Channel) close(err error) { c.closeOnce.Do(func() { c.l.Debugf("Closing with error: %+v", err) - c.closeErr = agenterrors.NewChannelClosedError(err) + c.closeErr = err c.m.Lock() for _, ch := range c.responses { // unblock all subscribers @@ -141,7 +148,7 @@ func (c *Channel) Requests() <-chan *ServerRequest { } // Send sends message to pmm-managed. It is no-op once channel is closed (see Wait). -func (c *Channel) Send(resp *models.AgentResponse) error { +func (c *Channel) Send(resp *AgentResponse) { msg := &agentpb.AgentMessage{ Id: resp.ID, } @@ -151,7 +158,7 @@ func (c *Channel) Send(resp *models.AgentResponse) error { if resp.Status != nil { msg.Status = resp.Status.Proto() } - return c.send(msg) + c.send(msg) } // SendAndWaitResponse sends request to pmm-managed, blocks until response is available. @@ -162,24 +169,21 @@ func (c *Channel) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agen id := atomic.AddUint32(&c.lastSentRequestID, 1) ch := c.subscribe(id) - err := c.send(&agentpb.AgentMessage{ + c.send(&agentpb.AgentMessage{ Id: id, Payload: payload.AgentMessageRequestPayload(), }) - if err != nil { - return nil, err - } resp := <-ch return resp.Payload, resp.Error } -func (c *Channel) send(msg *agentpb.AgentMessage) error { +func (c *Channel) send(msg *agentpb.AgentMessage) { c.sendM.Lock() select { case <-c.closeWait: c.sendM.Unlock() - return c.Wait() + return default: } @@ -197,12 +201,10 @@ func (c *Channel) send(msg *agentpb.AgentMessage) error { err := c.s.Send(msg) c.sendM.Unlock() if err != nil { - err = errors.Wrap(err, "failed to send message") - c.close(err) - return agenterrors.NewChannelClosedError(err) + c.close(errors.Wrap(err, "failed to send message")) + return } c.mSend.Inc() - return nil } // runReader receives messages from server. @@ -312,13 +314,10 @@ func (c *Channel) runReceiver() { c.l.Warnf("pmm-managed was not able to process message with id: %d, handling of that payload type is unimplemented", msg.Id) continue } - err := c.Send(&models.AgentResponse{ + c.Send(&AgentResponse{ ID: msg.Id, Status: grpcstatus.New(codes.Unimplemented, "can't handle message type sent, it is not implemented"), }) - if err != nil { - c.l.Error(err) - } } } } diff --git a/agent/client/channel/channel_test.go b/agent/client/channel/channel_test.go index 5be2fee495..5a785fa508 100644 --- a/agent/client/channel/channel_test.go +++ b/agent/client/channel/channel_test.go @@ -34,7 +34,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/percona/pmm/agent/models" "github.com/percona/pmm/agent/utils/truncate" "github.com/percona/pmm/api/agentpb" ) @@ -150,6 +149,7 @@ func TestAgentRequestWithTruncatedInvalidUTF8(t *testing.T) { Mysql: &agentpb.MetricsBucket_MySQL{}, }} resp, err = channel.SendAndWaitResponse(&request) + require.NoError(t, err) assert.Nil(t, resp) } @@ -248,13 +248,12 @@ func TestServerRequest(t *testing.T) { for req := range channel.Requests() { assert.IsType(t, &agentpb.Ping{}, req.Payload) - err := channel.Send(&models.AgentResponse{ + channel.Send(&AgentResponse{ ID: req.ID, Payload: &agentpb.Pong{ CurrentTime: timestamppb.Now(), }, }) - assert.NoError(t, err) } } @@ -417,11 +416,10 @@ func TestUnexpectedResponsePayloadFromServer(t *testing.T) { channel, _, teardown := setup(t, connect, io.EOF) defer teardown() req := <-channel.Requests() - err := channel.Send(&models.AgentResponse{ + channel.Send(&AgentResponse{ ID: req.ID, Payload: &agentpb.Pong{ CurrentTime: timestamppb.Now(), }, }) - assert.NoError(t, err) } diff --git a/agent/client/client.go b/agent/client/client.go index f6ba5547a7..b79eb8e261 100644 --- a/agent/client/client.go +++ b/agent/client/client.go @@ -35,11 +35,9 @@ import ( grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/percona/pmm/agent/client/cache" "github.com/percona/pmm/agent/client/channel" "github.com/percona/pmm/agent/config" "github.com/percona/pmm/agent/connectionuptime" - "github.com/percona/pmm/agent/models" "github.com/percona/pmm/agent/runner" "github.com/percona/pmm/agent/runner/actions" // TODO https://jira.percona.com/browse/PMM-7206 "github.com/percona/pmm/agent/runner/jobs" @@ -74,6 +72,7 @@ type Client struct { l *logrus.Entry backoff *backoff.Backoff + done chan struct{} // for unit tests only dialTimeout time.Duration @@ -86,16 +85,13 @@ type Client struct { cus *connectionuptime.Service logStore *tailog.Store - - wg sync.WaitGroup - cache models.Cache } // New creates new client. // // Caller should call Run. func New(cfg configGetter, supervisor supervisor, r *runner.Runner, connectionChecker connectionChecker, sv softwareVersioner, sib serviceInfoBroker, cus *connectionuptime.Service, logStore *tailog.Store) *Client { //nolint:lll - out := &Client{ + return &Client{ cfg: cfg, supervisor: supervisor, connectionChecker: connectionChecker, @@ -108,20 +104,22 @@ func New(cfg configGetter, supervisor supervisor, r *runner.Runner, connectionCh cus: cus, logStore: logStore, } - - var err error - if out.cache, err = cache.New(cfg.Get().Cache); err != nil { - out.l.Infof("Failed to init cache: %s. Initializing cachelless client.", err) - out.cache = &cache.Dummy{} - } - return out } -// Connect connects to the server, processes requests and sends responses. +// Run connects to the server, processes requests and sends responses. +// +// Once Run exits, connection is closed, and caller should cancel supervisor's context. +// Then caller should wait until Done() channel is closed. +// That Client instance can't be reused after that. // // Returned error is already logged and should be ignored. It is returned only for unit tests. -func (c *Client) Connect(ctx context.Context) error { +func (c *Client) Run(ctx context.Context) error { c.l.Info("Starting...") + + c.rw.Lock() + c.done = make(chan struct{}) + c.rw.Unlock() + cfg := c.cfg.Get() // do nothing until ctx is canceled if config misses critical info @@ -135,6 +133,7 @@ func (c *Client) Connect(ctx context.Context) error { if missing != "" { c.l.Errorf("%s is not provided, halting.", missing) <-ctx.Done() + close(c.done) return errors.Wrap(ctx.Err(), "missing "+missing) } @@ -159,11 +158,13 @@ func (c *Client) Connect(ctx context.Context) error { } } if ctx.Err() != nil { + close(c.done) if dialErr != nil { return dialErr } return ctx.Err() } + defer func() { if err := dialResult.conn.Close(); err != nil { c.l.Errorf("Connection closed: %s.", err) @@ -171,16 +172,65 @@ func (c *Client) Connect(ctx context.Context) error { } c.l.Info("Connection closed.") }() - c.supervisor.ClearChangesChannel() - c.SendActualStatuses() - c.cache.SetSender(dialResult.channel) c.rw.Lock() c.md = dialResult.md c.channel = dialResult.channel c.rw.Unlock() - c.processChannelRequests(ctx) + // Once the client is connected, ctx cancellation is ignored by it. + // + // We start goroutines, and terminate the gRPC connection and exit Run when any of them exits: + // + // 1. processActionResults reads action results from action runner and sends them to the channel. + // It exits when the action runner is stopped by cancelling ctx. + // + // 2. processSupervisorRequests reads requests (status changes and QAN data) from the supervisor and sends them to the channel. + // It exits when the supervisor is stopped by the caller. + // Caller stops supervisor when Run is left and gRPC connection is closed. + // + // 3. processChannelRequests reads requests from the channel and processes them. + // It exits when an unexpected message is received from the channel, or when can't be received at all. + // When Run is left, caller stops supervisor, and that allows processSupervisorRequests to exit. + // + // Done() channel is closed when all three goroutines exited. + + // TODO Make 2 and 3 behave more like 1 - that seems to be simpler. + // https://jira.percona.com/browse/PMM-4245 + + c.supervisor.ClearChangesChannel() + c.SendActualStatuses() + + oneDone := make(chan struct{}, 4) + go func() { + c.processActionResults(ctx) + c.l.Debug("processActionResults is finished") + oneDone <- struct{}{} + }() + go func() { + c.processJobsResults(ctx) + c.l.Debug("processJobsResults is finished") + oneDone <- struct{}{} + }() + go func() { + c.processSupervisorRequests(ctx) + c.l.Debug("processSupervisorRequests is finished") + oneDone <- struct{}{} + }() + go func() { + c.processChannelRequests(ctx) + c.l.Debug("processChannelRequests is finished") + oneDone <- struct{}{} + }() + + <-oneDone + go func() { + <-oneDone + <-oneDone + <-oneDone + c.l.Info("Done.") + close(c.done) + }() return nil } @@ -188,7 +238,7 @@ func (c *Client) Connect(ctx context.Context) error { func (c *Client) SendActualStatuses() { for _, agent := range c.supervisor.AgentsList() { c.l.Infof("Sending status: %s (port %d).", agent.Status, agent.ListenPort) - resp, err := c.sendAndWaitResponse( + resp, err := c.channel.SendAndWaitResponse( &agentpb.StateChangedRequest{ AgentId: agent.AgentId, Status: agent.Status, @@ -205,6 +255,11 @@ func (c *Client) SendActualStatuses() { } } +// Done is closed when all supervisors's requests are sent (if possible) and connection is closed. +func (c *Client) Done() <-chan struct{} { + return c.done +} + func (c *Client) processActionResults(ctx context.Context) { for { select { @@ -212,7 +267,7 @@ func (c *Client) processActionResults(ctx context.Context) { if result == nil { continue } - resp, err := c.sendAndWaitResponse(result) + resp, err := c.channel.SendAndWaitResponse(result) if err != nil { c.l.Error(err) continue @@ -234,7 +289,7 @@ func (c *Client) processJobsResults(ctx context.Context) { if message == nil { continue } - c.send(&models.AgentResponse{ + c.channel.Send(&channel.AgentResponse{ ID: 0, // Jobs send messages that don't require any responses, so we can leave message ID blank. Payload: message, }) @@ -246,47 +301,59 @@ func (c *Client) processJobsResults(ctx context.Context) { } func (c *Client) processSupervisorRequests(ctx context.Context) { - for { - select { - case state := <-c.supervisor.Changes(): - if state == nil { - continue - } - resp, err := c.sendAndWaitResponse(state) - if err != nil { - c.l.Error(err) - continue - } - if resp == nil { - c.l.Warn("Failed to send StateChanged request.") + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case state := <-c.supervisor.Changes(): + if state == nil { + continue + } + resp, err := c.channel.SendAndWaitResponse(state) + if err != nil { + c.l.Error(err) + continue + } + if resp == nil { + c.l.Warn("Failed to send StateChanged request.") + } + case <-ctx.Done(): + c.l.Infof("Supervisor Changes() channel drained.") + return } - case <-ctx.Done(): - c.l.Infof("Supervisor Changes() channel drained.") - return } - } -} + }() -func (c *Client) processQANRequests(ctx context.Context) { - for { - select { - case collect := <-c.supervisor.QANRequests(): - if collect == nil { - continue - } - resp, err := c.sendAndWaitResponse(collect) - if err != nil { - c.l.Error(err) - continue - } - if resp == nil { - c.l.Warn("Failed to send QanCollect request.") + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case collect := <-c.supervisor.QANRequests(): + if collect == nil { + continue + } + resp, err := c.channel.SendAndWaitResponse(collect) + if err != nil { + c.l.Error(err) + continue + } + if resp == nil { + c.l.Warn("Failed to send QanCollect request.") + } + case <-ctx.Done(): + c.l.Infof("Supervisor QANRequests() channel drained.") + return } - case <-ctx.Done(): - c.l.Infof("Supervisor QANRequests() channel drained.") - return } - } + }() + + wg.Wait() } func (c *Client) processChannelRequests(ctx context.Context) { @@ -360,24 +427,23 @@ LOOP: } c.cus.RegisterConnectionStatus(time.Now(), true) - response := &models.AgentResponse{ + response := &channel.AgentResponse{ ID: req.ID, Payload: responsePayload, } if status != nil { response.Status = status } - c.send(response) + c.channel.Send(response) case <-ctx.Done(): break LOOP } } if err := c.channel.Wait(); err != nil { c.l.Debugf("Channel closed: %s.", err) - } else { - c.l.Debug("Channel closed.") + return } - c.l.Debug("processChannelRequests is finished") + c.l.Debug("Channel closed.") } func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error { @@ -667,8 +733,29 @@ type dialResult struct { // dial tries to connect to the server once. // State changes are logged via l. Returned error is not user-visible. func dial(dialCtx context.Context, cfg *config.Config, l *logrus.Entry) (*dialResult, error) { + opts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithUserAgent("pmm-agent/" + version.Version), + } + if cfg.Server.WithoutTLS { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + host, _, _ := net.SplitHostPort(cfg.Server.Address) + tlsConfig := tlsconfig.Get() + tlsConfig.ServerName = host + tlsConfig.InsecureSkipVerify = cfg.Server.InsecureTLS + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + } + + if cfg.Server.Username != "" { + opts = append(opts, grpc.WithPerRPCCredentials(&basicAuth{ + username: cfg.Server.Username, + password: cfg.Server.Password, + })) + } + l.Infof("Connecting to %s ...", cfg.Server.FilteredURL()) - conn, err := grpc.DialContext(dialCtx, cfg.Server.Address, getGRPCOps(cfg)...) + conn, err := grpc.DialContext(dialCtx, cfg.Server.Address, opts...) if err != nil { msg := err.Error() @@ -902,79 +989,6 @@ func convertAgentErrorToGrpcStatus(agentErr error) *grpcstatus.Status { return status } -func getGRPCOps(cfg *config.Config) []grpc.DialOption { - opts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithUserAgent("pmm-agent/" + version.Version), - } - if cfg.Server.WithoutTLS { - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } else { - host, _, _ := net.SplitHostPort(cfg.Server.Address) - tlsConfig := tlsconfig.Get() - tlsConfig.ServerName = host - tlsConfig.InsecureSkipVerify = cfg.Server.InsecureTLS - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - } - - if cfg.Server.Username != "" { - opts = append(opts, grpc.WithPerRPCCredentials(&basicAuth{ - username: cfg.Server.Username, - password: cfg.Server.Password, - })) - } - return opts -} - -// Start starts client processes that handle requests and sends responses. -func (c *Client) Start(ctx context.Context) { - if _, ok := c.cache.(*cache.Cache); ok { - c.wg.Add(1) - go func() { - defer c.wg.Done() - <-ctx.Done() - c.cache.Close() - }() - } - c.wg.Add(4) - go func() { - defer c.wg.Done() - c.processActionResults(ctx) - c.l.Debug("processActionResults is finished") - }() - go func() { - defer c.wg.Done() - c.processJobsResults(ctx) - c.l.Debug("processJobsResults is finished") - }() - go func() { - defer c.wg.Done() - c.processSupervisorRequests(ctx) - c.l.Debug("processSupervisorRequests is finished") - }() - go func() { - defer c.wg.Done() - c.processQANRequests(ctx) - c.l.Debug("processQANRequests is finished") - }() -} - -// Wait waits for client processes to stop. -func (c *Client) Wait() { - c.wg.Wait() -} - -func (c *Client) sendAndWaitResponse(msg agentpb.AgentRequestPayload) (agentpb.ServerResponsePayload, error) { //nolint:ireturn - return c.cache.SendAndWaitResponse(msg) -} - -func (c *Client) send(msg *models.AgentResponse) { - err := c.cache.Send(msg) - if err != nil { - c.l.Error(err) - } -} - // check interface. var ( _ prometheus.Collector = (*Client)(nil) diff --git a/agent/client/client_test.go b/agent/client/client_test.go index c49eb91fad..ce987348f8 100644 --- a/agent/client/client_test.go +++ b/agent/client/client_test.go @@ -18,8 +18,6 @@ import ( "context" "fmt" "net" - "os" - "path" "testing" "time" @@ -87,7 +85,7 @@ func TestClient(t *testing.T) { cfgStorage := config.NewStorage(&config.Config{}) client := New(cfgStorage, nil, nil, nil, nil, nil, nil, nil) cancel() - err := client.Connect(ctx) + err := client.Run(ctx) assert.EqualError(t, err, "missing PMM Server address: context canceled") }) @@ -102,7 +100,7 @@ func TestClient(t *testing.T) { }) client := New(cfgStorage, nil, nil, nil, nil, nil, nil, nil) cancel() - err := client.Connect(ctx) + err := client.Run(ctx) assert.EqualError(t, err, "missing Agent ID: context canceled") }) @@ -118,7 +116,7 @@ func TestClient(t *testing.T) { }, }) client := New(cfgStorage, nil, nil, nil, nil, nil, connectionuptime.NewService(time.Hour), nil) - err := client.Connect(ctx) + err := client.Run(ctx) assert.EqualError(t, err, "failed to dial: context deadline exceeded") }) @@ -167,8 +165,7 @@ func TestClient(t *testing.T) { r := runner.New(cfgStorage.Get().RunnerCapacity) client := New(cfgStorage, &s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil) - client.Start(context.Background()) - err := client.Connect(context.Background()) + err := client.Run(context.Background()) assert.NoError(t, err) assert.Equal(t, serverMD, client.GetServerConnectMetadata()) }) @@ -197,7 +194,7 @@ func TestClient(t *testing.T) { client := New(cfgStorage, nil, nil, nil, nil, nil, connectionuptime.NewService(time.Hour), nil) client.dialTimeout = 100 * time.Millisecond - err := client.Connect(ctx) + err := client.Run(ctx) assert.EqualError(t, err, "failed to get server metadata: rpc error: code = Canceled desc = context canceled", "%+v", err) }) }) @@ -286,8 +283,7 @@ func TestUnexpectedActionType(t *testing.T) { r := runner.New(cfgStorage.Get().RunnerCapacity) client := New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil) - client.Start(context.Background()) - err := client.Connect(context.Background()) + err := client.Run(context.Background()) assert.NoError(t, err) assert.Equal(t, serverMD, client.GetServerConnectMetadata()) } @@ -383,132 +379,3 @@ func TestArgListFromMongoDBParams(t *testing.T) { }) } } - -func TestCache(t *testing.T) { - t.Parallel() - cacheSize := uint32(3 * 1024 * 1024) - t.Run("Read", func(t *testing.T) { - t.Parallel() - serverMD := &agentpb.ServerConnectMetadata{ServerVersion: t.Name()} - - // test payload - payload := &agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{Common: &agentpb.MetricsBucket_Common{Queryid: "33b65211f7df97665e74b8f98dbc90d5"}}}} - - connect := func(stream agentpb.Agent_ConnectServer) error { - md, err := agentpb.ReceiveAgentConnectMetadata(stream) - require.NoError(t, err) - assert.Equal(t, &agentpb.AgentConnectMetadata{ID: "agent_id"}, md) - err = agentpb.SendServerConnectMetadata(stream, serverMD) - require.NoError(t, err) - msg, err := stream.Recv() - require.NoError(t, err) - ping := msg.GetPing() - require.NotNil(t, ping) - err = stream.Send(&agentpb.ServerMessage{ - Id: msg.Id, - Payload: (&agentpb.Pong{CurrentTime: timestamppb.Now()}).ServerMessageResponsePayload(), - }) - require.NoError(t, err) - msg, err = stream.Recv() - require.NoError(t, err) - require.Equal(t, payload.MetricsBucket[0].Common.Queryid, msg.Payload.(*agentpb.AgentMessage_QanCollect).QanCollect.MetricsBucket[0].Common.Queryid) - return nil - } - - // setup for client processes - qan := make(chan *agentpb.QANCollectRequest, 1) - s := &mockSupervisor{} - s.On("Changes").Return(make(<-chan *agentpb.StateChangedRequest)) - s.On("QANRequests").Return((<-chan *agentpb.QANCollectRequest)(qan)) - s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{}) - s.On("ClearChangesChannel").Return() - - // setup for cache - testDirname := path.Join(os.TempDir(), fmt.Sprint(t.Name(), time.Now().UnixNano())) - t.Cleanup(func() { _ = os.RemoveAll(testDirname) }) - cfgStorage := config.NewStorage(&config.Config{Cache: config.Cache{Dir: testDirname, PrioritizedSize: cacheSize, UnprioritizedSize: cacheSize}}) - - // actual test - client := New(cfgStorage, s, runner.New(0), nil, nil, nil, connectionuptime.NewService(time.Hour), nil) - client.Start(context.Background()) - time.Sleep(1 * time.Second) // time to start processes - qan <- payload // sending request with qan on closed connection to store in cache - port, teardown := setup(t, connect) - defer teardown() - - *cfgStorage = *config.NewStorage(&config.Config{ - ID: "agent_id", - Server: config.Server{ - Address: fmt.Sprintf("127.0.0.1:%d", port), // prepare config with server params - WithoutTLS: true, - }, - }) - require.NoError(t, client.Connect(context.Background())) - }) - t.Run("Read with shutdown", func(t *testing.T) { - t.Parallel() - serverMD := &agentpb.ServerConnectMetadata{ServerVersion: t.Name()} - - // test payload - payload := &agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{Common: &agentpb.MetricsBucket_Common{Queryid: "33b65211f7df97665e74b8f98dbc90d6"}}}} - - connect := func(stream agentpb.Agent_ConnectServer) error { - md, err := agentpb.ReceiveAgentConnectMetadata(stream) - require.NoError(t, err) - assert.Equal(t, &agentpb.AgentConnectMetadata{ID: "agent_id"}, md) - err = agentpb.SendServerConnectMetadata(stream, serverMD) - require.NoError(t, err) - msg, err := stream.Recv() - require.NoError(t, err) - ping := msg.GetPing() - require.NotNil(t, ping) - err = stream.Send(&agentpb.ServerMessage{ - Id: msg.Id, - Payload: (&agentpb.Pong{CurrentTime: timestamppb.Now()}).ServerMessageResponsePayload(), - }) - require.NoError(t, err) - msg, err = stream.Recv() - require.NoError(t, err) - require.Equal(t, payload.MetricsBucket[0].Common.Queryid, msg.Payload.(*agentpb.AgentMessage_QanCollect).QanCollect.MetricsBucket[0].Common.Queryid) - return nil - } - - // setup for client processes - qan := make(chan *agentpb.QANCollectRequest, 1) - s := &mockSupervisor{} - s.On("Changes").Return(make(<-chan *agentpb.StateChangedRequest)) - s.On("QANRequests").Return((<-chan *agentpb.QANCollectRequest)(qan)) - s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{}) - s.On("ClearChangesChannel").Return() - r := runner.New(0) - - // setup for cache - testDirname := path.Join(os.TempDir(), fmt.Sprint(t.Name(), time.Now().UnixNano())) - t.Cleanup(func() { _ = os.RemoveAll(testDirname) }) - cfgStorage := config.NewStorage(&config.Config{Cache: config.Cache{Dir: testDirname, PrioritizedSize: cacheSize, UnprioritizedSize: cacheSize}}) - - // actual test - client := New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil) - ctx, cancel := context.WithCancel(context.Background()) - client.Start(ctx) // starting client processes - time.Sleep(1 * time.Second) // time to start processes - qan <- payload // sending request with qan on closed connection to store in cache - time.Sleep(1 * time.Second) // time to store message before close cache - cancel() // shuting down client - client.Wait() // closing cache and waiting for processes to stop - - client = New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil) // new client setup - client.Start(context.Background()) - port, teardown := setup(t, connect) - defer teardown() - - *cfgStorage = *config.NewStorage(&config.Config{ // prepare config with new server params - ID: "agent_id", - Server: config.Server{ - Address: fmt.Sprintf("127.0.0.1:%d", port), - WithoutTLS: true, - }, - }) - require.NoError(t, client.Connect(context.Background())) - }) -} diff --git a/agent/commands/run.go b/agent/commands/run.go index 2d57167511..c62fefc664 100644 --- a/agent/commands/run.go +++ b/agent/commands/run.go @@ -95,12 +95,10 @@ func Run() { localServer.Run(ctx, reloadCh) cancel() }() - client.Start(ctx) processClientUntilCancel(ctx, client, reloadCh) cleanupTmp(cfg.Paths.TempDir, l) - client.Wait() wg.Wait() select { case <-rootCtx.Done(): @@ -113,9 +111,13 @@ func Run() { func processClientUntilCancel(ctx context.Context, client *client.Client, reloadCh chan bool) { for { clientCtx, cancelClientCtx := context.WithCancel(ctx) + err := client.Run(clientCtx) + if err != nil { + logrus.Errorf("Client error: %s", err) + } - _ = client.Connect(clientCtx) cancelClientCtx() + <-client.Done() select { case <-reloadCh: diff --git a/agent/config/config.go b/agent/config/config.go index a233fcb278..74748a24c4 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -37,10 +37,8 @@ import ( ) const ( - pathBaseDefault = "/usr/local/percona/pmm2" - agentTmpPath = "tmp" // temporary directory to keep exporters' config files, relative to pathBase - prioritizedCacheSize = 100 * 1024 * 1024 // 100 MB TODO: R&D on median daily amount - unprioritizedCacheSize = 500 * 1024 * 1024 // 500 MB TODO: R&D on median daily amount + pathBaseDefault = "/usr/local/percona/pmm2" + agentTmpPath = "tmp" // temporary directory to keep exporters' config files, relative to pathBase ) // Server represents PMM Server configuration. @@ -142,18 +140,6 @@ type Setup struct { ExposeExporter bool } -// Cache represent cache settings. -type Cache struct { - // Dir represent file to store valuable agent messages - Dir string `yaml:"dir"` - // PrioritizedSize represent cache size for high priority agent messages e.g., job, action results - PrioritizedSize uint32 `yaml:"prioritized_size"` - // UnprioritizedSize represent cache size for low priority agent messages e.g., qan metrics - UnprioritizedSize uint32 `yaml:"unprioritized_size"` - // Disable disables cache - Disable bool `yaml:"disable"` -} - // Config represents pmm-agent's configuration. // //nolint:maligned @@ -176,7 +162,6 @@ type Config struct { //nolint:musttag LogLinesCount uint `json:"log-lines-count"` WindowConnectedTime time.Duration `yaml:"window-connected-time"` - Cache Cache `yaml:"cache"` Setup Setup `yaml:"-"` } @@ -223,12 +208,6 @@ func get(args []string, cfg *Config, l *logrus.Entry) (configFileF string, err e if cfg.WindowConnectedTime == 0 { cfg.WindowConnectedTime = time.Hour } - if cfg.Cache.PrioritizedSize == 0 { - cfg.Cache.PrioritizedSize = prioritizedCacheSize - } - if cfg.Cache.UnprioritizedSize == 0 { - cfg.Cache.UnprioritizedSize = unprioritizedCacheSize - } for sp, v := range map[*string]string{ &cfg.Paths.NodeExporter: "node_exporter", @@ -243,7 +222,6 @@ func get(args []string, cfg *Config, l *logrus.Entry) (configFileF string, err e &cfg.Paths.PTPGSummary: "tools/pt-pg-summary", &cfg.Paths.PTMongoDBSummary: "tools/pt-mongodb-summary", &cfg.Paths.PTMySQLSummary: "tools/pt-mysql-summary", - &cfg.Cache.Dir: "cache", } { if *sp == "" { *sp = v @@ -286,9 +264,6 @@ func get(args []string, cfg *Config, l *logrus.Entry) (configFileF string, err e if !filepath.IsAbs(cfg.Paths.PTMySQLSummary) { cfg.Paths.PTMySQLSummary = filepath.Join(cfg.Paths.PathsBase, cfg.Paths.PTMySQLSummary) } - if !filepath.IsAbs(cfg.Cache.Dir) { - cfg.Cache.Dir = filepath.Join(cfg.Paths.PathsBase, cfg.Cache.Dir) - } for _, sp := range []*string{ &cfg.Paths.NodeExporter, @@ -412,14 +387,6 @@ func Application(cfg *Config) (*kingpin.Application, *string) { Envar("PMM_AGENT_PATHS_PT_MYSQL_SUMMARY").StringVar(&cfg.Paths.PTMySQLSummary) app.Flag("paths-tempdir", "Temporary directory for exporters [PMM_AGENT_PATHS_TEMPDIR]"). Envar("PMM_AGENT_PATHS_TEMPDIR").StringVar(&cfg.Paths.TempDir) - app.Flag("cache-dir", "Directory for cache [PMM_AGENT_CACHE_DIR]"). - Envar("PMM_AGENT_CACHE_DIR").StringVar(&cfg.Cache.Dir) - app.Flag("cache-prioritized-size", "Cache size for high priority agent messages e.g., job, action results [PMM_AGENT_CACHE_PRIORITIZED_SIZE]"). - Envar("PMM_AGENT_CACHE_PRIORITIZED_SIZE").Uint32Var(&cfg.Cache.PrioritizedSize) - app.Flag("cache-unprioritized-size", "Cache for low priority agent messages e.g., qan metrics [PMM_AGENT_CACHE_UNPRIORITIZED_SIZE]"). - Envar("PMM_AGENT_CACHE_UNPRIORITIZED_SIZE").Uint32Var(&cfg.Cache.UnprioritizedSize) - app.Flag("cache-disable", "Disables cache [PMM_AGENT_CACHE_DISABLE]"). - Envar("PMM_AGENT_CACHE_DISABLE").BoolVar(&cfg.Cache.Disable) // no flag for SlowLogFilePrefix - it is only for development and testing app.Flag("ports-min", "Minimal allowed port number for listening sockets [PMM_AGENT_PORTS_MIN]"). diff --git a/agent/config/config_test.go b/agent/config/config_test.go index be2287829a..490f6c450f 100644 --- a/agent/config/config_test.go +++ b/agent/config/config_test.go @@ -127,11 +127,6 @@ func TestGet(t *testing.T) { Max: 51999, }, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/usr/local/percona/pmm2/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Empty(t, configFilepath) @@ -192,11 +187,6 @@ func TestGet(t *testing.T) { Max: 51999, }, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/usr/local/percona/pmm2/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Equal(t, name, configFilepath) @@ -258,11 +248,6 @@ func TestGet(t *testing.T) { LogLevel: "info", Debug: true, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/usr/local/percona/pmm2/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Equal(t, name, configFilepath) @@ -329,11 +314,6 @@ func TestGet(t *testing.T) { }, Debug: true, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/usr/local/percona/pmm2/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Equal(t, name, configFilepath) @@ -399,11 +379,6 @@ func TestGet(t *testing.T) { }, Debug: true, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/base/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Equal(t, name, configFilepath) @@ -467,11 +442,6 @@ func TestGet(t *testing.T) { }, Debug: true, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/base/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Equal(t, name, configFilepath) @@ -520,11 +490,6 @@ func TestGet(t *testing.T) { }, Debug: true, LogLinesCount: 1024, - Cache: Cache{ - Dir: "/usr/local/percona/pmm2/cache", - PrioritizedSize: 104857600, - UnprioritizedSize: 524288000, - }, } assert.Equal(t, expected, actual) assert.Equal(t, filepath.Join(wd, name), configFilepath) diff --git a/agent/models/agent_message.go b/agent/models/agent_message.go deleted file mode 100644 index a8f858c6e1..0000000000 --- a/agent/models/agent_message.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2023 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package models contains client domain models and helpers. -package models - -import ( - "google.golang.org/grpc/status" - - "github.com/percona/pmm/api/agentpb" -) - -// AgentRequest represents an request from agent. -// It is similar to agentpb.AgentMessage except it can contain only requests, -// and the payload is already unwrapped (XXX instead of AgentMessage_XXX). -type AgentRequest struct { - ID uint32 - Payload agentpb.AgentRequestPayload -} - -// AgentResponse represents agent's response. -// It is similar to agentpb.AgentMessage except it can contain only responses, -// and the payload is already unwrapped (XXX instead of AgentMessage_XXX). -type AgentResponse struct { - ID uint32 - Status *status.Status - Payload agentpb.AgentResponsePayload -} diff --git a/agent/models/cache.go b/agent/models/cache.go deleted file mode 100644 index fee575cdb0..0000000000 --- a/agent/models/cache.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2023 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package models contains client domain models and helpers. -package models - -import "github.com/percona/pmm/api/agentpb" - -// Sender is a subset of methods of channel, cache. -type Sender interface { - Send(resp *AgentResponse) error - SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb.ServerResponsePayload, error) -} - -// Cache represent cache methods. -type Cache interface { - Sender - Close() - SetSender(s Sender) -} diff --git a/agent/utils/buffer-ring/bigqueue/bigqueue.go b/agent/utils/buffer-ring/bigqueue/bigqueue.go deleted file mode 100644 index 8bc35b8472..0000000000 --- a/agent/utils/buffer-ring/bigqueue/bigqueue.go +++ /dev/null @@ -1,429 +0,0 @@ -// Copyright 2023 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package bigqueue implements ring buffer based on bigqueue. -package bigqueue - -import ( - "fmt" - "math" - "os" - "path/filepath" - "sync" - "sync/atomic" - "time" - - "github.com/jhunters/bigqueue" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - grpcstatus "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - - "github.com/percona/pmm/agent/models" - agenterrors "github.com/percona/pmm/agent/utils/errors" - "github.com/percona/pmm/api/agentpb" -) - -const ( - metaFileSize = 16 + 8 // represent FrontFileInfo size + MetaFileInfo size - indexEntrySize = 32 // represent index entry size -) - -var ( - ErrClosed = errors.New("cache closed") - - dataPageSize = 1024 * 1024 // represent page size for data entries - indexPageSize = dataPageSize / indexEntrySize // represent page size for index entries (default bigqueue ratio) - drainThreshold = int64(1024 * 1024) // represent threshold for preliminary draining - gcDuration = 10 * time.Second // represent gc ticker duration -) - -// Ring represent ring buffer based on bigqueue. -type Ring struct { - l *logrus.Entry - fq *bigqueue.FileQueue - wg sync.WaitGroup - - sendLock sync.RWMutex - recvLock sync.RWMutex - totalSize int64 // represent the limit after which old data will be overwritten - - sender atomic.Pointer[models.Sender] - - gcCh chan struct{} - drainCh chan struct{} - recvNotifyCh chan struct{} - establishCh chan struct{} - done chan struct{} -} - -// New creates/loads ring buffer. -func New(dir string, size uint32, l *logrus.Entry) (*Ring, error) { - err := initPaths(dir) - if err != nil { - return nil, err - } - dir, queueName := filepath.Split(dir) - if lastRuneIdx := len(dir) - 1; len(dir) > 0 && rune(dir[lastRuneIdx]) == filepath.Separator { - dir = dir[:lastRuneIdx] - } - if metaSize := uint32(metaFileSize + indexPageSize + dataPageSize); metaSize > size { - return nil, fmt.Errorf("cache size must be greater than '%d' bytes to store at least one entry", metaSize) - } - fq := &bigqueue.FileQueue{} - if err = fq.Open(dir, queueName, &bigqueue.Options{ - DataPageSize: dataPageSize, - IndexItemsPerPage: int(math.Log2(float64(indexPageSize) / indexEntrySize)), - }); err != nil { - return nil, err - } - out := &Ring{ - l: l, - fq: fq, - totalSize: int64(size), - drainCh: make(chan struct{}, 1), - gcCh: make(chan struct{}, 1), - establishCh: make(chan struct{}, 1), - recvNotifyCh: make(chan struct{}, 1), - done: make(chan struct{}), - } - out.gcRunner() - out.sendRunner() - if !out.isEmpty() { - asyncNotify(out.recvNotifyCh) - } - return out, nil -} - -// Send stores agent responses in cache on nil channel. -func (r *Ring) Send(resp *models.AgentResponse) error { - msg := &agentpb.AgentMessage{Id: resp.ID} - if resp.Payload != nil { - msg.Payload = resp.Payload.AgentMessageResponsePayload() - } - if resp.Status != nil { - msg.Status = resp.Status.Proto() - } - - var ( - err error - s = r.sender.Load() - ) - - r.recvLock.Lock() - defer r.recvLock.Unlock() - if r.isEmpty() && s != nil { - err = (*s).Send(resp) - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - if r.sender.CompareAndSwap(s, nil) { - asyncRelease(r.establishCh) - r.l.Debugf("sender released: %v", err) - } - } else { - return err - } - } - - r.push(msg) - return nil -} - -// SendAndWaitResponse stores AgentMessageRequestPayload on nil channel. -func (r *Ring) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb.ServerResponsePayload, error) { //nolint:unparam,ireturn - var ( - err error - resp agentpb.ServerResponsePayload - s = r.sender.Load() - ) - - r.recvLock.Lock() - defer r.recvLock.Unlock() - if r.isEmpty() && s != nil { - resp, err = (*s).SendAndWaitResponse(payload) - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - if r.sender.CompareAndSwap(s, nil) { - asyncRelease(r.establishCh) - r.l.Debugf("sender released: %v", err) - } - } else { - return resp, err - } - } - - r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()}) - return &agentpb.StateChangedResponse{}, nil -} - -// SetSender check and set sender and notify sender loop. -func (r *Ring) SetSender(s models.Sender) { - r.sender.Store(&s) - asyncNotify(r.establishCh) - r.l.Debug("sender set") -} - -// Close closes cache. -func (r *Ring) Close() { - select { - case <-r.done: - default: - close(r.done) - r.wg.Wait() - if err := r.fq.Close(); err != nil { - r.l.Errorf("closing cache: %+v", err) - } - r.l.Info("cache closed") - } -} - -func (r *Ring) isEmpty() bool { - return r.fq.IsEmpty() -} - -func (r *Ring) push(msg *agentpb.AgentMessage) { - b, err := proto.Marshal(msg) - if err != nil { - r.l.Errorf("marshal proto while inserting message to cache: %+v", err) - return - } - size := int64(len(b)) + indexEntrySize - if size > r.totalSize { - r.l.Errorf("data size: '%d' overflows free cache space: '%d'", size, r.totalSize) - return - } - select { - case <-r.done: - return - default: - } - _, err = r.fq.Enqueue(b) - if err != nil { - r.l.Errorf("inserting to cache: %+v", err) - } - asyncNotify(r.recvNotifyCh) -} - -func (r *Ring) gcRunner() { - r.wg.Add(1) - go func() { - defer r.wg.Done() - ticker := time.NewTicker(gcDuration) - defer ticker.Stop() - for { - select { - case <-r.done: - r.doDrain() - return - case <-r.drainCh: - r.doDrain() - case <-ticker.C: - r.doDrain() - case <-r.gcCh: - r.sendLock.Lock() - r.runGC() - r.sendLock.Unlock() - } - } - }() -} - -func (r *Ring) doDrain() { - if overflow := r.size() + drainThreshold - r.totalSize; overflow > 0 { - r.sendLock.Lock() - r.drain(overflow) - r.runGC() - r.sendLock.Unlock() - } -} - -func (r *Ring) sendRunner() { - r.wg.Add(1) - go func() { - defer r.wg.Done() - for { - select { - case <-r.done: - return - case <-r.recvNotifyCh: - r.sendInLoop() - } - } - }() -} - -func (r *Ring) sendInLoop() { - var s *models.Sender - for { - s = r.sender.Load() - if s != nil { - break - } - select { - case <-r.done: - return - case <-r.establishCh: - continue - } - } - r.sendLock.Lock() - defer r.sendLock.Unlock() - var count int - for { - select { - case <-r.done: - return - default: - } - r.recvLock.Lock() - _, b, err := r.fq.Peek() - r.recvLock.Unlock() - if err != nil { - r.l.Errorf("reading entry from cache: %+v", err) - } - if b == nil { - break - } - var m agentpb.AgentMessage - if err := proto.Unmarshal(b, &m); err != nil { - r.l.Errorf("unmarshal entry from cache: %+v", err) - } else if err = r.send(*s, &m); err != nil { - if r.sender.CompareAndSwap(s, nil) { - asyncRelease(r.establishCh) - r.l.Debugf("sender released: %v", err) - } - break - } - r.recvLock.Lock() - r.fq.Skip(1) //nolint:errcheck - r.recvLock.Unlock() - count++ - } - if count > 0 { - asyncNotify(r.gcCh) - } -} - -// initPaths creates all paths for queue to use. Original repo creates directories with perm error. -func initPaths(dir string) error { - for _, path := range []string{ - "", - bigqueue.IndexFileName, - bigqueue.DataFileName, - bigqueue.MetaFileName, - bigqueue.FrontFileName, - } { - if err := os.MkdirAll(filepath.Join(dir, path), os.ModePerm); err != nil { - return err - } - } - return nil -} - -func (r *Ring) drain(amount int64) { - for size := int64(0); size < amount; { - r.recvLock.Lock() - _, b, err := r.fq.Dequeue() - r.recvLock.Unlock() - if err != nil { - r.l.Errorf("draining cache: %+v", err) - return - } - if b == nil { - return - } - size += int64(len(b)) + indexEntrySize - } -} - -func (r *Ring) size() int64 { - r.recvLock.Lock() - status := r.fq.Status() - r.recvLock.Unlock() - sum := status.FrontFileInfo.Size + status.MetaFileInfo.Size - for _, list := range status.IndexFileList { - sum += list.Size - } - for _, list := range status.DataFileList { - sum += list.Size - } - return sum -} - -func (r *Ring) runGC() { - r.recvLock.Lock() - defer r.recvLock.Unlock() - if err := r.fq.Gc(); err != nil { - r.l.Errorf("run gc: %+v", err) - } -} - -func (r *Ring) send(s models.Sender, m *agentpb.AgentMessage) error { - var err error - switch p := m.Payload.(type) { - // responses - case *agentpb.AgentMessage_StartAction: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.StartAction}) - case *agentpb.AgentMessage_StopAction: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.StopAction}) - case *agentpb.AgentMessage_PbmSwitchPitr: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.PbmSwitchPitr}) - case *agentpb.AgentMessage_StartJob: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.StartJob}) - case *agentpb.AgentMessage_JobStatus: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.JobStatus}) - case *agentpb.AgentMessage_GetVersions: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.GetVersions}) - case *agentpb.AgentMessage_JobProgress: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.JobProgress}) - case *agentpb.AgentMessage_StopJob: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.StopJob}) - case *agentpb.AgentMessage_CheckConnection: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.CheckConnection}) - case *agentpb.AgentMessage_JobResult: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.JobResult}) - case *agentpb.AgentMessage_AgentLogs: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.AgentLogs}) - case *agentpb.AgentMessage_SetState: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.SetState}) - case *agentpb.AgentMessage_Pong: - err = s.Send(&models.AgentResponse{ID: m.Id, Status: grpcstatus.FromProto(m.Status), Payload: p.Pong}) - // requests - case *agentpb.AgentMessage_ActionResult: - _, err = s.SendAndWaitResponse(p.ActionResult) - case *agentpb.AgentMessage_QanCollect: - _, err = s.SendAndWaitResponse(p.QanCollect) - case *agentpb.AgentMessage_StateChanged: - _, err = s.SendAndWaitResponse(p.StateChanged) - default: - r.l.Errorf("unknown message: %T", m) - return nil - } - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - return err - } - return nil -} - -func asyncNotify(ch chan struct{}) { - select { - case ch <- struct{}{}: - default: - } -} - -func asyncRelease(ch chan struct{}) { - select { - case <-ch: - default: - } -} diff --git a/agent/utils/buffer-ring/bigqueue/bigqueue_test.go b/agent/utils/buffer-ring/bigqueue/bigqueue_test.go deleted file mode 100644 index b153ed818e..0000000000 --- a/agent/utils/buffer-ring/bigqueue/bigqueue_test.go +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright 2023 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package bigqueue - -import ( - "bytes" - "io/fs" - "math/rand" - "os" - "path/filepath" - "runtime" - "strings" - "sync/atomic" - "testing" - "time" - - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - - "github.com/percona/pmm/agent/models" - "github.com/percona/pmm/api/agentpb" -) - -func TestMetaSizes(t *testing.T) { //nolint:tparallel - indexPageSize = 20 - t.Run("meta file size", func(t *testing.T) { - t.Parallel() - ring, err := New(filepath.Join(os.TempDir(), newRandomString(10)), uint32(dataPageSize+indexPageSize+metaFileSize), nil) - assert.NoError(t, err) - status := ring.fq.Status() - assert.Equal(t, int64(metaFileSize), status.FrontFileInfo.Size+status.MetaFileInfo.Size) - }) - t.Run("index entry size", func(t *testing.T) { - t.Parallel() - ring, err := New(filepath.Join(os.TempDir(), newRandomString(10)), uint32(dataPageSize+indexPageSize+metaFileSize), nil) - assert.NoError(t, err) - _, err = ring.fq.Enqueue([]byte("1")) - assert.NoError(t, err) - assert.Equal(t, int64(indexEntrySize), ring.fq.Status().IndexFileList[0].Size) - }) -} - -func TestNew(t *testing.T) { //nolint:tparallel - indexPageSize = 20 - t.Run("new with size less than meta", func(t *testing.T) { - t.Parallel() - _, err := New(filepath.Join(os.TempDir(), newRandomString(10)), uint32(dataPageSize+indexPageSize+metaFileSize)-1, nil) - assert.Error(t, err) - }) - t.Run("data size too big", func(t *testing.T) { - t.Parallel() - ring, log, cleanup := setupTest(t, filepath.Join(os.TempDir(), newRandomString(10)), uint32(dataPageSize+indexPageSize+metaFileSize)) - t.Cleanup(cleanup) - _, err := ring.SendAndWaitResponse(&agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{ - Common: &agentpb.MetricsBucket_Common{Queryid: newRandomString(dataPageSize + indexPageSize + metaFileSize)}, - }}}) - assert.NoError(t, err) - assert.Equal(t, "level=error msg=\"data size: '1048668' overflows free cache space: '1048620'\" cache=test\n", log.String()) - }) -} - -type sender struct { - t *testing.T - i uint32 -} - -func (s *sender) Send(resp *models.AgentResponse) error { return nil } -func (s *sender) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb.ServerResponsePayload, error) { - qan, ok := payload.(*agentpb.QANCollectRequest) - assert.Equal(s.t, true, ok) - assert.Equal(s.t, 1, len(qan.MetricsBucket)) - assert.Equal(s.t, atomic.LoadUint32(&s.i), qan.MetricsBucket[0].Common.PlaceholdersCount) - atomic.AddUint32(&s.i, 1) - return nil, nil -} - -func TestDrain(t *testing.T) { //nolint:tparallel - dataPageSize = indexEntrySize - indexPageSize = dataPageSize - payloadLen := indexEntrySize - 11 // queryId + proto = 32 - drainThreshold = 0 - - t.Run("push", func(t *testing.T) { - t.Parallel() - dir := filepath.Join(os.TempDir(), newRandomString(10)) - ring, log, cleanup := setupTest(t, dir, uint32(dataPageSize+indexPageSize)*3+metaFileSize) - t.Cleanup(cleanup) - - for i := uint32(1); i <= 4; i++ { - _, err := ring.SendAndWaitResponse(&agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{ - Common: &agentpb.MetricsBucket_Common{PlaceholdersCount: i, Queryid: newRandomString(payloadLen)}, - }}}) - assert.NoError(t, err) - runtime.Gosched() - } - asyncNotify(ring.drainCh) - runtime.Gosched() - time.Sleep(1 * time.Second) - s := sender{ - i: uint32(2), // first must be drained - t: t, - } - ring.SetSender(&s) - time.Sleep(1 * time.Second) - assert.NotEqual(t, uint32(2), atomic.LoadUint32(&s.i)) - assert.Equal(t, -1, strings.LastIndex(log.String(), "level=error")) - }) - t.Run("shutdown", func(t *testing.T) { - t.Parallel() - dir := filepath.Join(os.TempDir(), newRandomString(10)) - ring, log, _ := setupTest(t, dir, uint32(dataPageSize+indexPageSize)*3+metaFileSize) - for i := uint32(1); i <= 4; i++ { - _, err := ring.SendAndWaitResponse(&agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{ - Common: &agentpb.MetricsBucket_Common{PlaceholdersCount: i, Queryid: newRandomString(payloadLen)}, - }}}) - assert.NoError(t, err) - runtime.Gosched() - } - time.Sleep(1 * time.Second) - ring.Close() - assert.Equal(t, -1, strings.LastIndex(log.String(), "closing cache")) - - ring, log, cleanup := setupTest(t, dir, uint32(dataPageSize+indexPageSize)*3+metaFileSize) - t.Cleanup(cleanup) - s := sender{ - i: uint32(2), // first must be drained - t: t, - } - ring.SetSender(&s) - time.Sleep(1 * time.Second) - assert.NotEqual(t, uint32(2), atomic.LoadUint32(&s.i)) - assert.Equal(t, -1, strings.LastIndex(log.String(), "level=error")) - }) - t.Run("size", func(t *testing.T) { - t.Parallel() - dir := filepath.Join(os.TempDir(), newRandomString(10)) - ring, log, cleanup := setupTest(t, dir, uint32(dataPageSize+indexPageSize)*4+metaFileSize) - t.Cleanup(cleanup) - for i := uint32(1); i <= 5; i++ { - _, err := ring.SendAndWaitResponse(&agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{ - Common: &agentpb.MetricsBucket_Common{PlaceholdersCount: i, Queryid: newRandomString(payloadLen)}, - }}}) - assert.NoError(t, err) - runtime.Gosched() - } - asyncNotify(ring.drainCh) - runtime.Gosched() - time.Sleep(1 * time.Second) - - // after push all messages - size, err := dirSize(dir) - assert.NoError(t, err) - assert.Equal(t, int64(344), size) - s := sender{ - i: uint32(2), // first must be drained - t: t, - } - ring.SetSender(&s) - time.Sleep(1 * time.Second) - assert.NotEqual(t, uint32(2), atomic.LoadUint32(&s.i)) - assert.Equal(t, -1, strings.LastIndex(log.String(), "level=error")) - - // after send - size, err = dirSize(dir) - assert.NoError(t, err) - assert.Equal(t, int64(indexPageSize+dataPageSize+metaFileSize), size) - }) -} - -func TestReadWrite(t *testing.T) { //nolint:tparallel - dataPageSize = indexEntrySize - indexPageSize = dataPageSize - payloadLen := indexEntrySize - 11 // queryId + proto = 32 - drainThreshold = 0 - - t.Run("async read write", func(t *testing.T) { - t.Parallel() - dir := filepath.Join(os.TempDir(), newRandomString(10)) - ring, log, cleanup := setupTest(t, dir, uint32(dataPageSize+indexPageSize)*10+metaFileSize) - t.Cleanup(cleanup) - - started := make(chan struct{}) - go func() { - close(started) - for i := uint32(1); i <= 10; i++ { - _, err := ring.SendAndWaitResponse(&agentpb.QANCollectRequest{MetricsBucket: []*agentpb.MetricsBucket{{ - Common: &agentpb.MetricsBucket_Common{PlaceholdersCount: i, Queryid: newRandomString(payloadLen)}, - }}}) - assert.NoError(t, err) - runtime.Gosched() - } - }() - <-started - s := sender{ - i: uint32(1), - t: t, - } - ring.SetSender(&s) - time.Sleep(1 * time.Second) - assert.NotEqual(t, uint32(1), atomic.LoadUint32(&s.i)) - assert.Equal(t, -1, strings.LastIndex(log.String(), "level=error")) - }) -} - -func newRandomString(length int) string { - r := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec - const alp = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - b := make([]byte, length) - for i := range b { - b[i] = alp[r.Intn(len(alp))] - } - return string(b) -} - -func setupTest(t *testing.T, dir string, size uint32) (*Ring, *bytes.Buffer, func()) { - t.Helper() - var buf bytes.Buffer - testLogger := logrus.Logger{ - Out: &buf, - Level: logrus.ErrorLevel, - Formatter: &logrus.TextFormatter{ - DisableColors: true, - DisableTimestamp: true, - DisableSorting: true, - }, - } - out, err := New(dir, size, testLogger.WithField("cache", "test")) - assert.NoError(t, err) - cleanup := func() { - out.Close() - assert.Equal(t, -1, strings.LastIndex(buf.String(), "closing cache")) - assert.NoError(t, os.RemoveAll(dir)) - } - return out, &buf, cleanup -} - -func dirSize(path string) (int64, error) { - var size int64 - err := filepath.WalkDir(path, func(_ string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() { - return nil - } - fi, err := d.Info() - if err != nil { - return err - } - size += fi.Size() - return err - }) - return size, err -} diff --git a/agent/utils/errors/errors.go b/agent/utils/errors/errors.go index 95b750564f..55ca35f1f4 100644 --- a/agent/utils/errors/errors.go +++ b/agent/utils/errors/errors.go @@ -23,24 +23,4 @@ var ( // ErrActionQueueOverflow is returned when the agent is already running the maximum number of actions. ErrActionQueueOverflow = errors.New("action queue overflow") - // ErrChanConn is returned when the channel is closed. - ErrChanConn ChannelClosedError ) - -// NewChannelClosedError creates new channel connection closed error. -func NewChannelClosedError(err error) ChannelClosedError { - return ChannelClosedError{err} -} - -// ChannelClosedError is returned when the channel is closed. -type ChannelClosedError struct { - e error -} - -func (c ChannelClosedError) Error() string { - return c.e.Error() -} - -func (c ChannelClosedError) Unwrap() error { - return c.e -} diff --git a/go.mod b/go.mod index 7c0add89df..5c1df025c0 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/raft v1.5.0 - github.com/jhunters/bigqueue v1.2.7 github.com/jmoiron/sqlx v1.3.5 github.com/jotaen/kong-completion v0.0.5 github.com/lib/pq v1.10.9 diff --git a/go.sum b/go.sum index c4285512b7..fb147835c3 100644 --- a/go.sum +++ b/go.sum @@ -410,7 +410,6 @@ github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c h1:7lF+Vz0LqiRidnzC1Oq86fpX1q/iEv2KJdrCtttYjT4= github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/grafana/grafana-api-golang-client v0.25.0 h1:jDxnR0U5xgIwKzE+IliZJvjMUUTQxGq+c1s+3M46flI= github.com/grafana/grafana-api-golang-client v0.25.0/go.mod h1:24W29gPe9yl0/3A9X624TPkAOR8DpHno490cPwnkv8E= @@ -465,8 +464,6 @@ github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80s github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jhunters/bigqueue v1.2.7 h1:vwuQMWPBPxhnytZr0ydkzpZdQnnGd/WZmQsJSIJVGsw= -github.com/jhunters/bigqueue v1.2.7/go.mod h1:bHuCzOuSk3Q/Rc74d0pyF1PCPOiDDdr/Ugxu60awYpI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -490,7 +487,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -767,11 +763,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= -github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= -github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= diff --git a/managed/services/checks/checks.go b/managed/services/checks/checks.go index c3e95f0302..f31312834f 100644 --- a/managed/services/checks/checks.go +++ b/managed/services/checks/checks.go @@ -1494,14 +1494,14 @@ func (s *Service) filterSupportedChecks(advisors []check.Advisor) []check.Adviso for _, c := range advisor.Checks { if c.Version > maxSupportedVersion { s.l.Warnf("Unsupported checks version: %d, max supported version: %d.", c.Version, maxSupportedVersion) - continue + continue LOOP } switch c.Version { case 1: if ok := isQueryTypeSupported(c.Type); !ok { s.l.Warnf("Unsupported check type: %s.", c.Type) - continue + continue LOOP } case 2: for _, query := range c.Queries {