From 12a6719fc1274fe1f5d2b1c1313bc88ab3ef01ef Mon Sep 17 00:00:00 2001 From: Ashish Tiwari Date: Tue, 30 Aug 2022 18:51:30 +0530 Subject: [PATCH] Add Event streamer struct Signed-off-by: Ashish Tiwari --- go.mod | 4 ++-- go.sum | 4 ++++ main.go | 7 ++++--- osm/oam.go | 23 ++++++++++++++++++++--- osm/operations.go | 33 ++++++++++++++++----------------- osm/osm.go | 7 ++++--- 6 files changed, 50 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index c112630f..e08b1018 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ replace ( ) require ( - github.com/layer5io/meshery-adapter-library v0.5.9 - github.com/layer5io/meshkit v0.5.32 + github.com/layer5io/meshery-adapter-library v0.5.10 + github.com/layer5io/meshkit v0.5.37 github.com/layer5io/service-mesh-performance v0.3.4 gopkg.in/yaml.v2 v2.4.0 k8s.io/apimachinery v0.23.5 diff --git a/go.sum b/go.sum index ad3e3829..04423765 100644 --- a/go.sum +++ b/go.sum @@ -868,8 +868,12 @@ github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3 github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw= github.com/layer5io/meshery-adapter-library v0.5.9 h1:Zp79l4J8kMjML9zAQ4Xu4QiKM5q5HEGcv04Jjg+xWSA= github.com/layer5io/meshery-adapter-library v0.5.9/go.mod h1:IvURQMnZHa3z0OTcUSPqCHUgTsW2x0/+KjCqpYfMbt0= +github.com/layer5io/meshery-adapter-library v0.5.10 h1:Qgr6vDx2s10mkhtk7Mnz5I73m/9yf2yyjCkPMeB4jmA= +github.com/layer5io/meshery-adapter-library v0.5.10/go.mod h1:Sg6WNN82uRo2kiFDEMc/LM/AJ/Pu6ZmBZGbFxZuC7zc= github.com/layer5io/meshkit v0.5.32 h1:jIkQ9gKH7TPMWKbVtf6wQ+qv4553UyZ9SV4yKA2D4oo= github.com/layer5io/meshkit v0.5.32/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0= +github.com/layer5io/meshkit v0.5.37 h1:EO0wXAI+eqAm+4uKSzFd50rDkr6nqQ17m1j0wmv9hQA= +github.com/layer5io/meshkit v0.5.37/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0= github.com/layer5io/service-mesh-performance v0.3.2-0.20210122142912-a94e0658b021/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ= github.com/layer5io/service-mesh-performance v0.3.4 h1:aw/elsx0wkry7SyiQRIj31wW7TPCP4YfhINdNOLXVg8= github.com/layer5io/service-mesh-performance v0.3.4/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ= diff --git a/main.go b/main.go index a64399f6..ac78e7d7 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,7 @@ import ( configprovider "github.com/layer5io/meshkit/config/provider" "github.com/layer5io/meshkit/logger" "github.com/layer5io/meshkit/utils" + "github.com/layer5io/meshkit/utils/events" ) var ( @@ -70,9 +71,9 @@ func main() { service := &grpc.Service{} _ = cfg.GetObject(adapter.ServerKey, &service) - - service.Handler = osm.New(cfg, log, kubeconfigHandler) - service.Channel = make(chan interface{}, 100) + e := events.NewEventStreamer() + service.Handler = osm.New(cfg, log, kubeconfigHandler, e) + service.EventStreamer = e service.StartedAt = time.Now() service.Version = version service.GitSHA = gitsha diff --git a/osm/oam.go b/osm/oam.go index 567bc6ba..3de17af9 100644 --- a/osm/oam.go +++ b/osm/oam.go @@ -4,6 +4,9 @@ import ( "fmt" "strings" + "github.com/google/uuid" + "github.com/layer5io/meshery-adapter-library/meshes" + "github.com/layer5io/meshery-osm/internal/config" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" "gopkg.in/yaml.v2" ) @@ -19,26 +22,40 @@ func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool, kubec compFuncMap := map[string]CompHandler{ "OSMMesh": handleComponentOSMMesh, } - + stat1 := "deploying" + stat2 := "deployed" + if isDel { + stat1 = "removing" + stat2 = "removed" + } for _, comp := range comps { + ee := &meshes.EventsResponse{ + OperationId: uuid.New().String(), + Component: config.ServerDefaults["type"], + ComponentName: config.ServerDefaults["name"], + } fnc, ok := compFuncMap[comp.Spec.Type] if !ok { msg, err := handleOSMCoreComponent(h, comp, isDel, "", "", kubeconfigs) if err != nil { + h.streamErr(fmt.Sprintf("failed in %s %s", stat1, comp.Spec.Type), ee, err) errs = append(errs, err) continue } - + ee.Summary = fmt.Sprintf("%s: %s %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2) + ee.Details = fmt.Sprintf("The %s of type %s has been %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2) msgs = append(msgs, msg) continue } msg, err := fnc(h, comp, isDel, kubeconfigs) if err != nil { + h.streamErr(fmt.Sprintf("failed in %s %s", stat1, strings.TrimSuffix(comp.Spec.Type, ".OSM")), ee, err) errs = append(errs, err) continue } - + ee.Summary = fmt.Sprintf("%s: %s %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2) + ee.Details = fmt.Sprintf("The %s of type %s has been %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2) msgs = append(msgs, msg) } diff --git a/osm/operations.go b/osm/operations.go index 329806b8..e7c27fca 100644 --- a/osm/operations.go +++ b/osm/operations.go @@ -13,12 +13,11 @@ import ( ) // ApplyOperation function contains the operation handlers -func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest, hchan *chan interface{}) error { +func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest) error { err := h.CreateKubeconfigs(request.K8sConfigs) if err != nil { return err } - h.SetChannel(hchan) kubeconfigs := request.K8sConfigs operations := make(adapter.Operations) err = h.Config.GetObject(adapter.OperationsKey, &operations) @@ -27,10 +26,10 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR } e := &meshes.EventsResponse{ - OperationId: request.OperationID, - Summary: status.Deploying, - Details: "Operation is not supported", - Component: internalconfig.ServerDefaults["type"], + OperationId: request.OperationID, + Summary: status.Deploying, + Details: "Operation is not supported", + Component: internalconfig.ServerDefaults["type"], ComponentName: internalconfig.ServerDefaults["name"], } @@ -42,12 +41,12 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR stat, err := hh.installOSM(request.IsDeleteOperation, version, request.Namespace, kubeconfigs) if err != nil { summary := fmt.Sprintf("Error while %s Open service mesh", stat) - hh.streamErr(summary, e, err) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("Open service mesh %s successfully", stat) ee.Details = fmt.Sprintf("Open service mesh is now %s.", stat) - hh.StreamInfo(e) + hh.StreamInfo(ee) }(h, e) case common.BookInfoOperation, @@ -58,13 +57,13 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR appName := operations[request.OperationName].AdditionalProperties[common.ServiceName] stat, err := hh.installSampleApp(request.IsDeleteOperation, request.Namespace, operations[request.OperationName].Templates, kubeconfigs) if err != nil { - summary := fmt.Sprintf("Error while %s %s application", stat, appName) - hh.streamErr(summary, e, err) + summary := fmt.Sprintf("Error while %s %s application", stat, appName) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("%s application %s successfully", appName, stat) ee.Details = fmt.Sprintf("The %s application is now %s.", appName, stat) - hh.StreamInfo(e) + hh.StreamInfo(ee) }(h, e) case internalconfig.OSMBookStoreOperation: go func(hh *Handler, ee *meshes.EventsResponse) { @@ -78,12 +77,12 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR ) if err != nil { summary := fmt.Sprintf("Error while %s %s application", stat, appName) - hh.streamErr(summary, e, err) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("%s application %s successfully", appName, stat) ee.Details = fmt.Sprintf("The %s application is now %s.", appName, stat) - hh.StreamInfo(e) + hh.StreamInfo(ee) }(h, e) case common.SmiConformanceOperation: go func(hh *Handler, ee *meshes.EventsResponse) { @@ -99,13 +98,13 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR Annotations: make(map[string]string), }) if err != nil { - summary := fmt.Sprintf("Error while %s %s test", status.Running, name) - hh.streamErr(summary ,e, err) + summary := fmt.Sprintf("Error while %s %s test", status.Running, name) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("%s test %s successfully", name, status.Completed) ee.Details = "" - hh.StreamInfo(e) + hh.StreamInfo(ee) }(h, e) default: h.streamErr("Invalid operation", e, ErrOpInvalid) @@ -113,7 +112,7 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR return nil } -func(h *Handler) streamErr(summary string, e *meshes.EventsResponse, err error) { +func (h *Handler) streamErr(summary string, e *meshes.EventsResponse, err error) { e.Summary = summary e.Details = err.Error() e.ErrorCode = errors.GetCode(err) diff --git a/osm/osm.go b/osm/osm.go index db8ddd22..3248ea15 100644 --- a/osm/osm.go +++ b/osm/osm.go @@ -23,6 +23,7 @@ import ( "github.com/layer5io/meshkit/logger" "github.com/layer5io/meshkit/models" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" + "github.com/layer5io/meshkit/utils/events" "gopkg.in/yaml.v2" ) @@ -32,12 +33,13 @@ type Handler struct { } // New initializes a new handler instance -func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler) adapter.Handler { +func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler, e *events.EventStreamer) adapter.Handler { return &Handler{ Adapter: adapter.Adapter{ Config: config, Log: log, KubeconfigHandler: kc, + EventStreamer: e, }, } } @@ -88,13 +90,12 @@ func (h *Handler) CreateKubeconfigs(kubeconfigs []string) error { } // ProcessOAM will handles the grpc invocation for handling OAM objects -func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) { +func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) { err := h.CreateKubeconfigs(oamReq.K8sConfigs) if err != nil { return "", err } kubeconfigs := oamReq.K8sConfigs - h.SetChannel(hchan) var comps []v1alpha1.Component for _, acomp := range oamReq.OamComps { comp, err := oam.ParseApplicationComponent(acomp)