From 69ce63ff1197a3c3a5b41611b5be9db7eb7feb50 Mon Sep 17 00:00:00 2001 From: Alex Tymchuk Date: Fri, 15 Sep 2023 18:12:31 +0000 Subject: [PATCH] PMM-12375 move infoRequest to serviceInfoBroker --- managed/services/agents/connection_checker.go | 204 +---------------- .../services/agents/service_info_broker.go | 207 ++++++++++++++++++ 2 files changed, 212 insertions(+), 199 deletions(-) create mode 100644 managed/services/agents/service_info_broker.go diff --git a/managed/services/agents/connection_checker.go b/managed/services/agents/connection_checker.go index 248a4ae244c..b773b4ee24a 100644 --- a/managed/services/agents/connection_checker.go +++ b/managed/services/agents/connection_checker.go @@ -49,7 +49,7 @@ func NewConnectionChecker(r *Registry) *ConnectionChecker { } } -// CheckConnectionToService sends request to pmm-agent to check connection to service. +// CheckConnectionToService sends a request to pmm-agent to check connection to service. func (c *ConnectionChecker) CheckConnectionToService(ctx context.Context, q *reform.Querier, service *models.Service, agent *models.Agent) error { l := logger.Get(ctx) start := time.Now() @@ -97,47 +97,20 @@ func (c *ConnectionChecker) CheckConnectionToService(ctx context.Context, q *ref } l.Infof("CheckConnection response: %+v.", resp) - stype := service.ServiceType - switch stype { + switch service.ServiceType { case models.MySQLServiceType: - stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert - tableCount := stats.GetTableCount() - version := stats.GetVersion() + tableCount := resp.(*agentpb.CheckConnectionResponse).GetStats().GetTableCount() //nolint:forcetypeassert agent.TableCount = &tableCount l.Debugf("Updating table count: %d.", tableCount) if err = q.Update(agent); err != nil { return errors.Wrap(err, "failed to update table count") } - service.Version = version - l.Debugf("Updating service version: %s.", version) - if err = q.Update(service); err != nil { - return errors.Wrap(err, "failed to update service version") - } case models.ExternalServiceType, models.HAProxyServiceType: case models.PostgreSQLServiceType: - stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert - version := stats.GetVersion() - service.Version = version - l.Debugf("Updating service version: %s.", version) - if err = q.Update(service); err != nil { - return errors.Wrap(err, "failed to update service version") - } case models.MongoDBServiceType: - stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert - version := stats.GetVersion() - service.Version = version - l.Debugf("Updating service version: %s.", version) - if err = q.Update(service); err != nil { - return errors.Wrap(err, "failed to update service version") - } case models.ProxySQLServiceType: - stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert - version := stats.GetVersion() - service.Version = version - l.Debugf("Updating service version: %s.", version) - if err = q.Update(service); err != nil { - return errors.Wrap(err, "failed to update service version") - } + // nothing yet + default: return errors.Errorf("unhandled Service type %s", service.ServiceType) } @@ -241,170 +214,3 @@ func isExternalExporterConnectionCheckSupported(q *reform.Querier, pmmAgentID st } return true, nil } - -func createServiceInfoRequest(q *reform.Querier, service *models.Service, agent *models.Agent) (*agentpb.ServiceInfoRequest, error) { - var request *agentpb.ServiceInfoRequest - switch service.ServiceType { - case models.MySQLServiceType: - tdp := agent.TemplateDelimiters(service) - request = &agentpb.ServiceInfoRequest{ - Type: inventorypb.ServiceType_MYSQL_SERVICE, - Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), - Timeout: durationpb.New(3 * time.Second), - TextFiles: &agentpb.TextFiles{ - Files: agent.Files(), - TemplateLeftDelim: tdp.Left, - TemplateRightDelim: tdp.Right, - }, - TlsSkipVerify: agent.TLSSkipVerify, - } - case models.PostgreSQLServiceType: - tdp := agent.TemplateDelimiters(service) - request = &agentpb.ServiceInfoRequest{ - Type: inventorypb.ServiceType_POSTGRESQL_SERVICE, - Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), - Timeout: durationpb.New(3 * time.Second), - TextFiles: &agentpb.TextFiles{ - Files: agent.Files(), - TemplateLeftDelim: tdp.Left, - TemplateRightDelim: tdp.Right, - }, - } - case models.MongoDBServiceType: - tdp := agent.TemplateDelimiters(service) - request = &agentpb.ServiceInfoRequest{ - Type: inventorypb.ServiceType_MONGODB_SERVICE, - Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), - Timeout: durationpb.New(3 * time.Second), - TextFiles: &agentpb.TextFiles{ - Files: agent.Files(), - TemplateLeftDelim: tdp.Left, - TemplateRightDelim: tdp.Right, - }, - } - case models.ProxySQLServiceType: - request = &agentpb.ServiceInfoRequest{ - Type: inventorypb.ServiceType_PROXYSQL_SERVICE, - Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), - Timeout: durationpb.New(3 * time.Second), - } - case models.ExternalServiceType: - exporterURL, err := agent.ExporterURL(q) - if err != nil { - return nil, err - } - - request = &agentpb.ServiceInfoRequest{ - Type: inventorypb.ServiceType_EXTERNAL_SERVICE, - Dsn: exporterURL, - Timeout: durationpb.New(3 * time.Second), - } - case models.HAProxyServiceType: - exporterURL, err := agent.ExporterURL(q) - if err != nil { - return nil, err - } - - request = &agentpb.ServiceInfoRequest{ - Type: inventorypb.ServiceType_HAPROXY_SERVICE, - Dsn: exporterURL, - Timeout: durationpb.New(3 * time.Second), - } - default: - return nil, errors.Errorf("unhandled Service type %s", service.ServiceType) - } - return request, nil -} - -// GetInfoFromService sends request to pmm-agent to check connection to service. -func (c *ConnectionChecker) GetInfoFromService(ctx context.Context, q *reform.Querier, service *models.Service, agent *models.Agent) error { - l := logger.Get(ctx) - start := time.Now() - defer func() { - if dur := time.Since(start); dur > 4*time.Second { - l.Warnf("GetInfoFromService took %s.", dur) - } - }() - - pmmAgentID := pointer.GetString(agent.PMMAgentID) - if !agent.PushMetrics && (service.ServiceType == models.ExternalServiceType || service.ServiceType == models.HAProxyServiceType) { - pmmAgentID = models.PMMServerAgentID - } - - // Skip check connection to external exporter with old pmm-agent. - if service.ServiceType == models.ExternalServiceType || service.ServiceType == models.HAProxyServiceType { - isCheckConnSupported, err := isExternalExporterConnectionCheckSupported(q, pmmAgentID) - if err != nil { - return err - } - - if !isCheckConnSupported { - return nil - } - } - - pmmAgent, err := c.r.get(pmmAgentID) - if err != nil { - return err - } - - request, err := createServiceInfoRequest(q, service, agent) - if err != nil { - return err - } - - var sanitizedDSN string - for _, word := range redactWords(agent) { - sanitizedDSN = strings.ReplaceAll(request.Dsn, word, "****") - } - l.Infof("ServiceInfoRequest: type: %s, DSN: %s timeout: %s.", request.Type, sanitizedDSN, request.Timeout) - resp, err := pmmAgent.channel.SendAndWaitResponse(request) - if err != nil { - return err - } - l.Infof("ServiceInfo response: %+v.", resp) - - stype := service.ServiceType - switch stype { - case models.MySQLServiceType: - stats := resp.(*agentpb.ServiceInfoResponse) //nolint:forcetypeassert - tableCount := stats.GetTableCount() - agent.TableCount = &tableCount - l.Debugf("Updating table count: %d.", tableCount) - if err = q.Update(agent); err != nil { - return errors.Wrap(err, "failed to update table count") - } - return updateServiceVersion(ctx, q, resp, service) - case models.ExternalServiceType, models.HAProxyServiceType: - case models.PostgreSQLServiceType: - return updateServiceVersion(ctx, q, resp, service) - case models.MongoDBServiceType: - return updateServiceVersion(ctx, q, resp, service) - case models.ProxySQLServiceType: - return updateServiceVersion(ctx, q, resp, service) - default: - return errors.Errorf("unhandled Service type %s", service.ServiceType) - } - - msg := resp.(*agentpb.ServiceInfoResponse).Error //nolint:forcetypeassert - switch msg { - case "": - return nil - case context.Canceled.Error(), context.DeadlineExceeded.Error(): - msg = fmt.Sprintf("timeout (%s)", msg) - } - return status.Error(codes.FailedPrecondition, fmt.Sprintf("Connection check failed: %s.", msg)) -} - -func updateServiceVersion(ctx context.Context, q *reform.Querier, resp agentpb.AgentResponsePayload, service *models.Service) error { - l := logger.Get(ctx) - - version := resp.(*agentpb.ServiceInfoResponse).GetVersion() //nolint:forcetypeassert - service.Version = version - l.Debugf("Updating service version: %s.", version) - if err := q.Update(service); err != nil { - return errors.Wrap(err, "failed to update service version") - } - - return nil -} diff --git a/managed/services/agents/service_info_broker.go b/managed/services/agents/service_info_broker.go new file mode 100644 index 00000000000..3513dcb0afa --- /dev/null +++ b/managed/services/agents/service_info_broker.go @@ -0,0 +1,207 @@ +// Copyright (C) 2017 Percona LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package agents + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/AlekSi/pointer" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + "gopkg.in/reform.v1" + + "github.com/percona/pmm/api/agentpb" + "github.com/percona/pmm/api/inventorypb" + "github.com/percona/pmm/managed/models" + "github.com/percona/pmm/utils/logger" +) + +// ServiceInfoBroker checks if connection can be established to service. +type ServiceInfoBroker struct { + r *Registry +} + +// NewServiceInfoBroker creates new connection checker. +func NewServiceInfoBroker(r *Registry) *ServiceInfoBroker { + return &ServiceInfoBroker{ + r: r, + } +} + +func createServiceInfoRequest(q *reform.Querier, service *models.Service, agent *models.Agent) (*agentpb.ServiceInfoRequest, error) { + var request *agentpb.ServiceInfoRequest + switch service.ServiceType { + case models.MySQLServiceType: + tdp := agent.TemplateDelimiters(service) + request = &agentpb.ServiceInfoRequest{ + Type: inventorypb.ServiceType_MYSQL_SERVICE, + Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), + Timeout: durationpb.New(3 * time.Second), + TextFiles: &agentpb.TextFiles{ + Files: agent.Files(), + TemplateLeftDelim: tdp.Left, + TemplateRightDelim: tdp.Right, + }, + TlsSkipVerify: agent.TLSSkipVerify, + } + case models.PostgreSQLServiceType: + tdp := agent.TemplateDelimiters(service) + request = &agentpb.ServiceInfoRequest{ + Type: inventorypb.ServiceType_POSTGRESQL_SERVICE, + Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), + Timeout: durationpb.New(3 * time.Second), + TextFiles: &agentpb.TextFiles{ + Files: agent.Files(), + TemplateLeftDelim: tdp.Left, + TemplateRightDelim: tdp.Right, + }, + } + case models.MongoDBServiceType: + tdp := agent.TemplateDelimiters(service) + request = &agentpb.ServiceInfoRequest{ + Type: inventorypb.ServiceType_MONGODB_SERVICE, + Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), + Timeout: durationpb.New(3 * time.Second), + TextFiles: &agentpb.TextFiles{ + Files: agent.Files(), + TemplateLeftDelim: tdp.Left, + TemplateRightDelim: tdp.Right, + }, + } + case models.ProxySQLServiceType: + request = &agentpb.ServiceInfoRequest{ + Type: inventorypb.ServiceType_PROXYSQL_SERVICE, + Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil), + Timeout: durationpb.New(3 * time.Second), + } + case models.ExternalServiceType: + exporterURL, err := agent.ExporterURL(q) + if err != nil { + return nil, err + } + + request = &agentpb.ServiceInfoRequest{ + Type: inventorypb.ServiceType_EXTERNAL_SERVICE, + Dsn: exporterURL, + Timeout: durationpb.New(3 * time.Second), + } + case models.HAProxyServiceType: + exporterURL, err := agent.ExporterURL(q) + if err != nil { + return nil, err + } + + request = &agentpb.ServiceInfoRequest{ + Type: inventorypb.ServiceType_HAPROXY_SERVICE, + Dsn: exporterURL, + Timeout: durationpb.New(3 * time.Second), + } + default: + return nil, errors.Errorf("unhandled Service type %s", service.ServiceType) + } + return request, nil +} + +// GetInfoFromService sends a request to pmm-agent to query information from a service. +func (c *ServiceInfoBroker) GetInfoFromService(ctx context.Context, q *reform.Querier, service *models.Service, agent *models.Agent) error { + l := logger.Get(ctx) + start := time.Now() + defer func() { + if dur := time.Since(start); dur > 4*time.Second { + l.Warnf("GetInfoFromService took %s.", dur) + } + }() + + // External exporters and haproxy do not support this functionality. + if service.ServiceType == models.ExternalServiceType || service.ServiceType == models.HAProxyServiceType { + return nil + } + + pmmAgentID := pointer.GetString(agent.PMMAgentID) + if !agent.PushMetrics && (service.ServiceType == models.ExternalServiceType || service.ServiceType == models.HAProxyServiceType) { + pmmAgentID = models.PMMServerAgentID + } + + pmmAgent, err := c.r.get(pmmAgentID) + if err != nil { + return err + } + + request, err := createServiceInfoRequest(q, service, agent) + if err != nil { + return err + } + + var sanitizedDSN string + for _, word := range redactWords(agent) { + sanitizedDSN = strings.ReplaceAll(request.Dsn, word, "****") + } + l.Infof("ServiceInfoRequest: type: %s, DSN: %s timeout: %s.", request.Type, sanitizedDSN, request.Timeout) + resp, err := pmmAgent.channel.SendAndWaitResponse(request) + if err != nil { + return err + } + l.Infof("ServiceInfo response: %+v.", resp) + + stype := service.ServiceType + switch stype { + case models.MySQLServiceType: + stats := resp.(*agentpb.ServiceInfoResponse) //nolint:forcetypeassert + tableCount := stats.GetTableCount() + agent.TableCount = &tableCount + l.Debugf("Updating table count: %d.", tableCount) + if err = q.Update(agent); err != nil { + return errors.Wrap(err, "failed to update table count") + } + return updateServiceVersion(ctx, q, resp, service) + case models.ExternalServiceType, models.HAProxyServiceType: + case models.PostgreSQLServiceType: + return updateServiceVersion(ctx, q, resp, service) + case models.MongoDBServiceType: + return updateServiceVersion(ctx, q, resp, service) + case models.ProxySQLServiceType: + return updateServiceVersion(ctx, q, resp, service) + default: + return errors.Errorf("unhandled Service type %s", service.ServiceType) + } + + msg := resp.(*agentpb.ServiceInfoResponse).Error //nolint:forcetypeassert + switch msg { + case "": + return nil + case context.Canceled.Error(), context.DeadlineExceeded.Error(): + msg = fmt.Sprintf("timeout (%s)", msg) + } + return status.Error(codes.FailedPrecondition, fmt.Sprintf("Connection check failed: %s.", msg)) +} + +func updateServiceVersion(ctx context.Context, q *reform.Querier, resp agentpb.AgentResponsePayload, service *models.Service) error { + l := logger.Get(ctx) + + version := resp.(*agentpb.ServiceInfoResponse).GetVersion() //nolint:forcetypeassert + service.Version = version + l.Debugf("Updating service version: %s.", version) + if err := q.Update(service); err != nil { + return errors.Wrap(err, "failed to update service version") + } + + return nil +}