diff --git a/go.sum b/go.sum index fc8f58c1d7..fa58004736 100644 --- a/go.sum +++ b/go.sum @@ -1072,8 +1072,8 @@ github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 h1: github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw= github.com/turbot/steampipe-cloud-sdk-go v0.6.0 h1:ufAxOpKS1uq7eejuE5sfEu1+d7QAd0RBjl8Bn6+mIs8= github.com/turbot/steampipe-cloud-sdk-go v0.6.0/go.mod h1:M42TMBdMim4bV1YTMxhKyzfSGSMo4CXUkm3wt9w7t1Y= -github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.16 h1:61HPiCofKxRCr+PL/Cf3lROa/rQdF4oNvQyoTHn1Q5o= -github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.16/go.mod h1:Np0X1Oj3JNTcuf9JmvWwHrCqc0UB4iJLmUlOkRwMCWw= +github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.13 h1:ertavdOZLsu45h8H6R0A07W/SGmQYA5m7PrR5Fo4t+8= +github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.13/go.mod h1:aCOVDbfgl/y/vGUaEKkzi9jBrNerEN46l88DkQUu3Vw= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go v0.0.0-20180813092308-00b869d2f4a5/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= diff --git a/pkg/pluginmanager_service/plugin_manager.go b/pkg/pluginmanager_service/plugin_manager.go index 8731fd1639..8591b539c8 100644 --- a/pkg/pluginmanager_service/plugin_manager.go +++ b/pkg/pluginmanager_service/plugin_manager.go @@ -4,6 +4,7 @@ import ( "context" "crypto/md5" "fmt" + "github.com/turbot/steampipe-plugin-sdk/v5/sperr" "log" "os" "os/exec" @@ -127,7 +128,7 @@ func (m *PluginManager) Get(req *pb.GetRequest) (*pb.GetResponse, error) { reattach, err := m.ensurePlugin(pluginName, connectionConfigs, req) if err != nil { log.Printf("[WARN] PluginManager Get failed for %s: %s (%p)", pluginName, err.Error(), resp) - resp.FailureMap[pluginName] = err.Error() + resp.FailureMap[pluginName] = sperr.WrapWithMessage(err, "failed to start '%s'", pluginName).Error() } else { log.Printf("[TRACE] PluginManager Get succeeded for %s, pid %d (%p)", pluginName, reattach.Pid, resp) @@ -452,10 +453,10 @@ func (m *PluginManager) startPlugin(pluginName string, connectionConfigs []*sdkp // close failed chan to signal to anyone waiting for the plugin to startup that it failed close(startingPlugin.failed) - log.Printf("[WARN] startPluginProcess failed: %s (%p)", err.Error(), req) + log.Printf("[INFO] startPluginProcess failed: %s (%p)", err.Error(), req) // kill the client if startingPlugin.client != nil { - log.Printf("[WARN] failed pid: %d (%p)", startingPlugin.client.ReattachConfig().Pid, req) + log.Printf("[INFO] failed pid: %d (%p)", startingPlugin.client.ReattachConfig().Pid, req) startingPlugin.client.Kill() } @@ -557,6 +558,7 @@ func (m *PluginManager) startPluginProcess(pluginName string, connectionConfigs }) if _, err := client.Start(); err != nil { + err := m.handleStartFailure(err) return nil, err } @@ -589,7 +591,7 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio if supportedOperations == nil { supportedOperations = &sdkproto.GetSupportedOperationsResponse{} } - // if this plugin does not support multiple connections, we no longer support is + // if this plugin does not support multiple connections, we no longer support it if !supportedOperations.MultipleConnections { // TODO SEND NOTIFICATION TO CLI return nil, fmt.Errorf("plugins which do not supprt multiple connections (using SDK version < v4) are no longer supported. Upgrade plugin '%s", pluginName) @@ -605,18 +607,16 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio // this returns a list of all connections provided by this plugin err = m.setAllConnectionConfigs(connectionConfigs, pluginClient, supportedOperations) if err != nil { - // return retryable error log.Printf("[WARN] failed to set connection config for %s: %s", pluginName, err.Error()) - return nil, retry.RetryableError(err) + return nil, err } // if this plugin supports setting cache options, do so if supportedOperations.SetCacheOptions { err = m.setCacheOptions(pluginClient) if err != nil { - // return retryable error log.Printf("[WARN] failed to set cache options for %s: %s", pluginName, err.Error()) - return nil, retry.RetryableError(err) + return nil, err } } @@ -625,8 +625,7 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio // if this plugin has a dynamic schema, add connections to message server err = m.notifyNewDynamicSchemas(pluginClient, exemplarConnectionConfig, connectionNames) if err != nil { - // send err down running plugin error channel - return nil, retry.RetryableError(err) + return nil, err } log.Printf("[INFO] initializePlugin complete pid %d", client.ReattachConfig().Pid) @@ -813,6 +812,24 @@ func (m *PluginManager) nonAggregatorConnectionCount() int { return res } +func (m *PluginManager) handleStartFailure(err error) error { + // extract the plugin message + _, pluginMessage, found := strings.Cut(err.Error(), sdkplugin.UnrecognizedRemotePluginMessage) + if !found { + return err + } + pluginMessage, _, found = strings.Cut(pluginMessage, sdkplugin.UnrecognizedRemotePluginMessageSuffix) + if !found { + return err + } + + // if this was a panic during startup, reraise an error with the panic string + if strings.Contains(pluginMessage, sdkplugin.StartupPanicMessage) { + return fmt.Errorf(pluginMessage) + } + return err +} + func nonAggregatorConnectionCount(connections []*sdkproto.ConnectionConfig) int { res := 0 for _, c := range connections {