Skip to content

Commit

Permalink
PMM-12375 move infoRequest to serviceInfoBroker
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Tymchuk committed Sep 15, 2023
1 parent 6e1e012 commit 69ce63f
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 199 deletions.
204 changes: 5 additions & 199 deletions managed/services/agents/connection_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewConnectionChecker(r *Registry) *ConnectionChecker {
}
}

// CheckConnectionToService sends request to pmm-agent to check connection to service.
// CheckConnectionToService sends a request to pmm-agent to check connection to service.
func (c *ConnectionChecker) CheckConnectionToService(ctx context.Context, q *reform.Querier, service *models.Service, agent *models.Agent) error {
l := logger.Get(ctx)
start := time.Now()
Expand Down Expand Up @@ -97,47 +97,20 @@ func (c *ConnectionChecker) CheckConnectionToService(ctx context.Context, q *ref
}
l.Infof("CheckConnection response: %+v.", resp)

stype := service.ServiceType
switch stype {
switch service.ServiceType {
case models.MySQLServiceType:
stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert
tableCount := stats.GetTableCount()
version := stats.GetVersion()
tableCount := resp.(*agentpb.CheckConnectionResponse).GetStats().GetTableCount() //nolint:forcetypeassert
agent.TableCount = &tableCount
l.Debugf("Updating table count: %d.", tableCount)
if err = q.Update(agent); err != nil {
return errors.Wrap(err, "failed to update table count")
}
service.Version = version
l.Debugf("Updating service version: %s.", version)
if err = q.Update(service); err != nil {
return errors.Wrap(err, "failed to update service version")
}
case models.ExternalServiceType, models.HAProxyServiceType:
case models.PostgreSQLServiceType:
stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert
version := stats.GetVersion()
service.Version = version
l.Debugf("Updating service version: %s.", version)
if err = q.Update(service); err != nil {
return errors.Wrap(err, "failed to update service version")
}
case models.MongoDBServiceType:
stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert
version := stats.GetVersion()
service.Version = version
l.Debugf("Updating service version: %s.", version)
if err = q.Update(service); err != nil {
return errors.Wrap(err, "failed to update service version")
}
case models.ProxySQLServiceType:
stats := resp.(*agentpb.CheckConnectionResponse).GetStats() //nolint:forcetypeassert
version := stats.GetVersion()
service.Version = version
l.Debugf("Updating service version: %s.", version)
if err = q.Update(service); err != nil {
return errors.Wrap(err, "failed to update service version")
}
// nothing yet

default:
return errors.Errorf("unhandled Service type %s", service.ServiceType)
}
Expand Down Expand Up @@ -241,170 +214,3 @@ func isExternalExporterConnectionCheckSupported(q *reform.Querier, pmmAgentID st
}
return true, nil
}

func createServiceInfoRequest(q *reform.Querier, service *models.Service, agent *models.Agent) (*agentpb.ServiceInfoRequest, error) {
var request *agentpb.ServiceInfoRequest
switch service.ServiceType {
case models.MySQLServiceType:
tdp := agent.TemplateDelimiters(service)
request = &agentpb.ServiceInfoRequest{
Type: inventorypb.ServiceType_MYSQL_SERVICE,
Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil),
Timeout: durationpb.New(3 * time.Second),
TextFiles: &agentpb.TextFiles{
Files: agent.Files(),
TemplateLeftDelim: tdp.Left,
TemplateRightDelim: tdp.Right,
},
TlsSkipVerify: agent.TLSSkipVerify,
}
case models.PostgreSQLServiceType:
tdp := agent.TemplateDelimiters(service)
request = &agentpb.ServiceInfoRequest{
Type: inventorypb.ServiceType_POSTGRESQL_SERVICE,
Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil),
Timeout: durationpb.New(3 * time.Second),
TextFiles: &agentpb.TextFiles{
Files: agent.Files(),
TemplateLeftDelim: tdp.Left,
TemplateRightDelim: tdp.Right,
},
}
case models.MongoDBServiceType:
tdp := agent.TemplateDelimiters(service)
request = &agentpb.ServiceInfoRequest{
Type: inventorypb.ServiceType_MONGODB_SERVICE,
Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil),
Timeout: durationpb.New(3 * time.Second),
TextFiles: &agentpb.TextFiles{
Files: agent.Files(),
TemplateLeftDelim: tdp.Left,
TemplateRightDelim: tdp.Right,
},
}
case models.ProxySQLServiceType:
request = &agentpb.ServiceInfoRequest{
Type: inventorypb.ServiceType_PROXYSQL_SERVICE,
Dsn: agent.DSN(service, 2*time.Second, service.DatabaseName, nil),
Timeout: durationpb.New(3 * time.Second),
}
case models.ExternalServiceType:
exporterURL, err := agent.ExporterURL(q)
if err != nil {
return nil, err
}

request = &agentpb.ServiceInfoRequest{
Type: inventorypb.ServiceType_EXTERNAL_SERVICE,
Dsn: exporterURL,
Timeout: durationpb.New(3 * time.Second),
}
case models.HAProxyServiceType:
exporterURL, err := agent.ExporterURL(q)
if err != nil {
return nil, err
}

request = &agentpb.ServiceInfoRequest{
Type: inventorypb.ServiceType_HAPROXY_SERVICE,
Dsn: exporterURL,
Timeout: durationpb.New(3 * time.Second),
}
default:
return nil, errors.Errorf("unhandled Service type %s", service.ServiceType)
}
return request, nil
}

// GetInfoFromService sends request to pmm-agent to check connection to service.
func (c *ConnectionChecker) GetInfoFromService(ctx context.Context, q *reform.Querier, service *models.Service, agent *models.Agent) error {
l := logger.Get(ctx)
start := time.Now()
defer func() {
if dur := time.Since(start); dur > 4*time.Second {
l.Warnf("GetInfoFromService took %s.", dur)
}
}()

pmmAgentID := pointer.GetString(agent.PMMAgentID)
if !agent.PushMetrics && (service.ServiceType == models.ExternalServiceType || service.ServiceType == models.HAProxyServiceType) {
pmmAgentID = models.PMMServerAgentID
}

// Skip check connection to external exporter with old pmm-agent.
if service.ServiceType == models.ExternalServiceType || service.ServiceType == models.HAProxyServiceType {
isCheckConnSupported, err := isExternalExporterConnectionCheckSupported(q, pmmAgentID)
if err != nil {
return err
}

if !isCheckConnSupported {
return nil
}
}

pmmAgent, err := c.r.get(pmmAgentID)
if err != nil {
return err
}

request, err := createServiceInfoRequest(q, service, agent)
if err != nil {
return err
}

var sanitizedDSN string
for _, word := range redactWords(agent) {
sanitizedDSN = strings.ReplaceAll(request.Dsn, word, "****")
}
l.Infof("ServiceInfoRequest: type: %s, DSN: %s timeout: %s.", request.Type, sanitizedDSN, request.Timeout)
resp, err := pmmAgent.channel.SendAndWaitResponse(request)
if err != nil {
return err
}
l.Infof("ServiceInfo response: %+v.", resp)

stype := service.ServiceType
switch stype {
case models.MySQLServiceType:
stats := resp.(*agentpb.ServiceInfoResponse) //nolint:forcetypeassert
tableCount := stats.GetTableCount()
agent.TableCount = &tableCount
l.Debugf("Updating table count: %d.", tableCount)
if err = q.Update(agent); err != nil {
return errors.Wrap(err, "failed to update table count")
}
return updateServiceVersion(ctx, q, resp, service)
case models.ExternalServiceType, models.HAProxyServiceType:
case models.PostgreSQLServiceType:
return updateServiceVersion(ctx, q, resp, service)
case models.MongoDBServiceType:
return updateServiceVersion(ctx, q, resp, service)
case models.ProxySQLServiceType:
return updateServiceVersion(ctx, q, resp, service)
default:
return errors.Errorf("unhandled Service type %s", service.ServiceType)
}

msg := resp.(*agentpb.ServiceInfoResponse).Error //nolint:forcetypeassert
switch msg {
case "":
return nil
case context.Canceled.Error(), context.DeadlineExceeded.Error():
msg = fmt.Sprintf("timeout (%s)", msg)
}
return status.Error(codes.FailedPrecondition, fmt.Sprintf("Connection check failed: %s.", msg))
}

func updateServiceVersion(ctx context.Context, q *reform.Querier, resp agentpb.AgentResponsePayload, service *models.Service) error {
l := logger.Get(ctx)

version := resp.(*agentpb.ServiceInfoResponse).GetVersion() //nolint:forcetypeassert
service.Version = version
l.Debugf("Updating service version: %s.", version)
if err := q.Update(service); err != nil {
return errors.Wrap(err, "failed to update service version")
}

return nil
}
Loading

0 comments on commit 69ce63f

Please sign in to comment.