From 8c35171cb37dc5f9e92b35ae8da20d525c9a8469 Mon Sep 17 00:00:00 2001 From: sph Date: Wed, 14 Sep 2022 14:39:01 +0800 Subject: [PATCH] feat: webhook --- .gitlab-ci.yml | 2 +- core/cmd/cmd.go | 5 + core/common/const.go | 4 +- core/common/user.go | 21 +- core/controller/application/controller.go | 37 +- .../controller/application/controller_test.go | 5 + core/controller/cloudevent/controller.go | 24 +- core/controller/cloudevent/controller_test.go | 5 +- core/controller/cluster/controller.go | 4 + core/controller/cluster/controller_basic.go | 41 ++- .../cluster/controller_basic_test.go | 3 + .../controller/cluster/controller_internal.go | 14 + .../cluster/controller_operation.go | 28 ++ core/controller/cluster/controller_test.go | 7 +- core/controller/webhook/controller.go | 167 +++++++++ core/controller/webhook/controller_test.go | 121 +++++++ core/controller/webhook/models.go | 295 ++++++++++++++++ core/errors/horizonerrors.go | 17 +- core/http/api/v1/webhook/apis.go | 304 ++++++++++++++++ core/http/api/v1/webhook/routers.go | 64 ++++ db/migrations/20220922_add_webhook_event.sql | 57 +++ go.sum | 14 +- job/cmd/cmd.go | 45 ++- openapi/restful/webhook.yaml | 334 ++++++++++++++++++ pkg/event/dao/dao.go | 93 +++++ pkg/event/manager/manager.go | 76 ++++ pkg/event/manager/manager_test.go | 66 ++++ pkg/event/models/event.go | 53 +++ pkg/eventhandler/eventhandler.go | 172 +++++++++ pkg/eventhandler/wlgenerator/wlgenerator.go | 302 ++++++++++++++++ pkg/member/service/service.go | 22 ++ pkg/param/managerparam/managerparam.go | 9 +- pkg/util/common/common.go | 13 + pkg/webhook/dao/dao.go | 265 ++++++++++++++ pkg/webhook/manager/manager.go | 163 +++++++++ pkg/webhook/models/webhook.go | 41 +++ pkg/webhook/service/service.go | 293 +++++++++++++++ roles.yaml | 16 + 38 files changed, 3175 insertions(+), 27 deletions(-) create mode 100644 core/controller/webhook/controller.go create mode 100644 core/controller/webhook/controller_test.go create mode 100644 core/controller/webhook/models.go create mode 100644 core/http/api/v1/webhook/apis.go create mode 100644 core/http/api/v1/webhook/routers.go create mode 100644 db/migrations/20220922_add_webhook_event.sql create mode 100644 openapi/restful/webhook.yaml create mode 100644 pkg/event/dao/dao.go create mode 100644 pkg/event/manager/manager.go create mode 100644 pkg/event/manager/manager_test.go create mode 100644 pkg/event/models/event.go create mode 100644 pkg/eventhandler/eventhandler.go create mode 100644 pkg/eventhandler/wlgenerator/wlgenerator.go create mode 100644 pkg/util/common/common.go create mode 100644 pkg/webhook/dao/dao.go create mode 100644 pkg/webhook/manager/manager.go create mode 100644 pkg/webhook/models/webhook.go create mode 100644 pkg/webhook/service/service.go diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1fe219a5..ef7196a1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -6,7 +6,7 @@ stages: variables: COVERAGE_STANDARD: 70 REPOSITORY_PREFIX: harbor.mock.org/cloudnative/horizon/ - GOPROXY: https://goproxycn,direct + GOPROXY: https://goproxy.cn,direct before_script: - export APP_REVISION="${CI_COMMIT_TAG:-${CI_COMMIT_REF_NAME##*/}-r$CI_PIPELINE_ID}" diff --git a/core/cmd/cmd.go b/core/cmd/cmd.go index b8994c86..122fbd45 100644 --- a/core/cmd/cmd.go +++ b/core/cmd/cmd.go @@ -40,6 +40,7 @@ import ( templateschematagctl "g.hz.netease.com/horizon/core/controller/templateschematag" terminalctl "g.hz.netease.com/horizon/core/controller/terminal" userctl "g.hz.netease.com/horizon/core/controller/user" + webhookctl "g.hz.netease.com/horizon/core/controller/webhook" accessapi "g.hz.netease.com/horizon/core/http/api/v1/access" "g.hz.netease.com/horizon/core/http/api/v1/accesstoken" "g.hz.netease.com/horizon/core/http/api/v1/application" @@ -70,6 +71,7 @@ import ( templateschematagapi "g.hz.netease.com/horizon/core/http/api/v1/templateschematag" terminalapi "g.hz.netease.com/horizon/core/http/api/v1/terminal" "g.hz.netease.com/horizon/core/http/api/v1/user" + "g.hz.netease.com/horizon/core/http/api/v1/webhook" appv2 "g.hz.netease.com/horizon/core/http/api/v2/application" buildAPI "g.hz.netease.com/horizon/core/http/api/v2/build" envtemplatev2 "g.hz.netease.com/horizon/core/http/api/v2/envtemplate" @@ -463,6 +465,7 @@ func Run(flags *Flags) { buildSchemaCtrl = build.NewController(buildSchema) accessTokenCtl = accesstokenctl.NewController(parameter) scopeCtl = scopectl.NewController(parameter) + webhookCtl = webhookctl.NewController(parameter) ) var ( @@ -497,6 +500,7 @@ func Run(flags *Flags) { envtemplatev2API = envtemplatev2.NewAPI(envTemplateCtl) accessTokenAPI = accesstoken.NewAPI(accessTokenCtl, roleService, scopeService) scopeAPI = scope.NewAPI(scopeCtl) + webhookAPI = webhook.NewAPI(webhookCtl) ) // init server @@ -562,6 +566,7 @@ func Run(flags *Flags) { templatev2.RegisterRoutes(r, templateAPIV2) accesstoken.RegisterRoutes(r, accessTokenAPI) scope.RegisterRoutes(r, scopeAPI) + webhook.RegisterRoutes(r, webhookAPI) // start cloud event server go runCloudEventServer( diff --git a/core/common/const.go b/core/common/const.go index 65331bc8..94141b67 100644 --- a/core/common/const.go +++ b/core/common/const.go @@ -64,7 +64,7 @@ const ( // use the pipeline's cluster's member info ResourcePipelinerun = "pipelineruns" - // ResourceOauthApps urrently oauthapp do not have direct member info, will + // ResourceOauthApps currently oauthapp do not have direct member info, will // use the oauthapp's groups member info ResourceOauthApps = "oauthapps" @@ -72,6 +72,8 @@ const ( ResourceTemplateRelease = "templatereleases" AliasTemplateRelease = "releases" + + ResourceWebhook = "webhooks" ) const ( diff --git a/core/common/user.go b/core/common/user.go index 6c00c3bc..1884e5f7 100644 --- a/core/common/user.go +++ b/core/common/user.go @@ -4,10 +4,12 @@ import ( "context" "strings" + "github.com/gin-gonic/gin" + herror "g.hz.netease.com/horizon/core/errors" "g.hz.netease.com/horizon/pkg/authentication/user" perror "g.hz.netease.com/horizon/pkg/errors" - "github.com/gin-gonic/gin" + usermodels "g.hz.netease.com/horizon/pkg/user/models" ) const ( @@ -22,6 +24,23 @@ const ( TokenHeaderValuePrefix = "Bearer" ) +type User struct { + ID uint `json:"id"` + Name string `json:"name"` + Email string `json:"email"` +} + +func ToUser(user *usermodels.User) *User { + if user == nil { + return nil + } + return &User{ + ID: user.ID, + Name: user.Name, + Email: user.Email, + } +} + func UserContextKey() string { return contextUserKey } diff --git a/core/controller/application/controller.go b/core/controller/application/controller.go index d9cad3f9..8acd7940 100644 --- a/core/controller/application/controller.go +++ b/core/controller/application/controller.go @@ -18,6 +18,8 @@ import ( codemodels "g.hz.netease.com/horizon/pkg/cluster/code" clustermanager "g.hz.netease.com/horizon/pkg/cluster/manager" perror "g.hz.netease.com/horizon/pkg/errors" + eventmanager "g.hz.netease.com/horizon/pkg/event/manager" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" groupmanager "g.hz.netease.com/horizon/pkg/group/manager" groupsvc "g.hz.netease.com/horizon/pkg/group/service" "g.hz.netease.com/horizon/pkg/hook/hook" @@ -27,11 +29,13 @@ import ( pipelinemanager "g.hz.netease.com/horizon/pkg/pipelinerun/pipeline/manager" pipelinemodels "g.hz.netease.com/horizon/pkg/pipelinerun/pipeline/models" regionmodels "g.hz.netease.com/horizon/pkg/region/models" + "g.hz.netease.com/horizon/pkg/server/middleware/requestid" trmanager "g.hz.netease.com/horizon/pkg/templaterelease/manager" templateschema "g.hz.netease.com/horizon/pkg/templaterelease/schema" usersvc "g.hz.netease.com/horizon/pkg/user/service" "g.hz.netease.com/horizon/pkg/util/errors" "g.hz.netease.com/horizon/pkg/util/jsonschema" + "g.hz.netease.com/horizon/pkg/util/log" "g.hz.netease.com/horizon/pkg/util/permission" "g.hz.netease.com/horizon/pkg/util/wlog" ) @@ -77,6 +81,7 @@ type controller struct { hook hook.Hook userSvc usersvc.Service memberManager member.Manager + eventMgr eventmanager.Manager applicationRegionMgr applicationregionmanager.Manager pipelinemanager pipelinemanager.Manager buildSchema *build.Schema @@ -97,6 +102,7 @@ func NewController(param *param.Param) Controller { hook: param.Hook, userSvc: param.UserSvc, memberManager: param.MemberManager, + eventMgr: param.EventManager, applicationRegionMgr: param.ApplicationRegionManager, pipelinemanager: param.PipelineMgr, buildSchema: param.BuildSchema, @@ -190,6 +196,7 @@ func (c *controller) GetApplicationV2(ctx context.Context, id uint) (_ *GetAppli return resp, err } +//nolint may be used in the future func (c *controller) postHook(ctx context.Context, eventType hook.EventType, content interface{}) { if c.hook != nil { event := hook.Event{ @@ -274,8 +281,19 @@ func (c *controller) CreateApplication(ctx context.Context, groupID uint, ret := ofApplicationModel(applicationModel, fullPath, trs, request.TemplateInput.Pipeline, request.TemplateInput.Application) - // 7. post hook - c.postHook(ctx, hook.CreateApplication, ret) + // 7. record event + // c.postHook(ctx, hook.CreateApplication, ret) + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Application, + Action: eventmodels.Created, + ResourceID: ret.ID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } return ret, nil } @@ -543,8 +561,19 @@ func (c *controller) DeleteApplication(ctx context.Context, id uint, hard bool) return err } - // 4. post hook - c.postHook(ctx, hook.DeleteApplication, app.Name) + // 4. delete application + // c.postHook(ctx, hook.DeleteApplication, app.Name) + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Application, + Action: eventmodels.Deleted, + ResourceID: id, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } return nil } diff --git a/core/controller/application/controller_test.go b/core/controller/application/controller_test.go index 45f42f29..a64c432b 100644 --- a/core/controller/application/controller_test.go +++ b/core/controller/application/controller_test.go @@ -17,6 +17,7 @@ import ( userauth "g.hz.netease.com/horizon/pkg/authentication/user" codemodels "g.hz.netease.com/horizon/pkg/cluster/code" clustermodels "g.hz.netease.com/horizon/pkg/cluster/models" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" groupmodels "g.hz.netease.com/horizon/pkg/group/models" groupservice "g.hz.netease.com/horizon/pkg/group/service" membermodels "g.hz.netease.com/horizon/pkg/member/models" @@ -271,6 +272,9 @@ func TestMain(m *testing.M) { if err := db.AutoMigrate(&membermodels.Member{}); err != nil { panic(err) } + if err := db.AutoMigrate(&eventmodels.Event{}); err != nil { + panic(err) + } ctx = context.TODO() ctx = context.WithValue(ctx, common.UserContextKey(), &userauth.DefaultInfo{ Name: "Tony", @@ -339,6 +343,7 @@ func Test(t *testing.T) { templateReleaseMgr: manager.TemplateReleaseManager, clusterMgr: manager.ClusterMgr, userSvc: userservice.NewService(manager), + eventMgr: manager.EventManager, } group, err := manager.GroupManager.Create(ctx, &groupmodels.Group{ diff --git a/core/controller/cloudevent/controller.go b/core/controller/cloudevent/controller.go index d1db2553..f2c2e590 100644 --- a/core/controller/cloudevent/controller.go +++ b/core/controller/cloudevent/controller.go @@ -6,9 +6,12 @@ import ( "strings" clustermanager "g.hz.netease.com/horizon/pkg/cluster/manager" + eventmanager "g.hz.netease.com/horizon/pkg/event/manager" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" prmanager "g.hz.netease.com/horizon/pkg/pipelinerun/manager" prmodels "g.hz.netease.com/horizon/pkg/pipelinerun/models" pipelinemanager "g.hz.netease.com/horizon/pkg/pipelinerun/pipeline/manager" + "g.hz.netease.com/horizon/pkg/server/middleware/requestid" trmanager "g.hz.netease.com/horizon/pkg/templaterelease/manager" "g.hz.netease.com/horizon/core/common" @@ -34,6 +37,7 @@ type controller struct { clusterMgr clustermanager.Manager clusterGitRepo gitrepo.ClusterGitRepo templateReleaseMgr trmanager.Manager + eventMgr eventmanager.Manager } func NewController(tektonFty factory.Factory, parameter *param.Param) Controller { @@ -44,6 +48,7 @@ func NewController(tektonFty factory.Factory, parameter *param.Param) Controller clusterMgr: parameter.ClusterMgr, clusterGitRepo: parameter.ClusterGitRepo, templateReleaseMgr: parameter.TemplateReleaseManager, + eventMgr: parameter.EventManager, } } @@ -114,7 +119,24 @@ func (c *controller) CloudEvent(ctx context.Context, wpr *WrappedPipelineRun) (e // 最后指标上报,保证同一条pipelineRun,只上报一条指标 metrics.Observe(pipelineResult) - // 5. insert pipeline into db + // 6. create event + rid, _ := requestid.FromContext(ctx) + clusterID, err := strconv.ParseUint(pipelineResult.BusinessData.ClusterID, 10, 0) + if err != nil { + log.Warningf(ctx, "failed to parse cluster id, err: %s", err.Error()) + } + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Builded, + ResourceID: uint(clusterID), + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } + + // 7. insert pipeline into db err = c.pipelineMgr.Create(ctx, pipelineResult) if err != nil { // err不往上层抛,上层也无法处理这种异常 diff --git a/core/controller/cloudevent/controller_test.go b/core/controller/cloudevent/controller_test.go index a1c964ac..2a5010b5 100644 --- a/core/controller/cloudevent/controller_test.go +++ b/core/controller/cloudevent/controller_test.go @@ -14,6 +14,7 @@ import ( tektonftymock "g.hz.netease.com/horizon/mock/pkg/cluster/tekton/factory" userauth "g.hz.netease.com/horizon/pkg/authentication/user" "g.hz.netease.com/horizon/pkg/cluster/tekton/collector" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" "g.hz.netease.com/horizon/pkg/param/managerparam" prmodels "g.hz.netease.com/horizon/pkg/pipelinerun/models" "github.com/stretchr/testify/assert" @@ -35,7 +36,7 @@ var ( func TestMain(m *testing.M) { db, _ := orm.NewSqliteDB("") manager = managerparam.InitManager(db) - if err := db.AutoMigrate(&prmodels.Pipelinerun{}); err != nil { + if err := db.AutoMigrate(&prmodels.Pipelinerun{}, &eventmodels.Event{}); err != nil { panic(err) } ctx = context.TODO() @@ -217,7 +218,9 @@ func Test(t *testing.T) { c := &controller{ pipelinerunMgr: pipelinerunMgr, + pipelineMgr: manager.PipelineMgr, tektonFty: tektonFty, + eventMgr: manager.EventManager, } err = c.CloudEvent(ctx, &WrappedPipelineRun{ diff --git a/core/controller/cluster/controller.go b/core/controller/cluster/controller.go index 20d40012..e3ee68af 100644 --- a/core/controller/cluster/controller.go +++ b/core/controller/cluster/controller.go @@ -18,6 +18,7 @@ import ( "g.hz.netease.com/horizon/pkg/config/grafana" envmanager "g.hz.netease.com/horizon/pkg/environment/manager" environmentregionmapper "g.hz.netease.com/horizon/pkg/environmentregion/manager" + eventmanager "g.hz.netease.com/horizon/pkg/event/manager" grafanaservice "g.hz.netease.com/horizon/pkg/grafana" groupmanager "g.hz.netease.com/horizon/pkg/group/manager" groupsvc "g.hz.netease.com/horizon/pkg/group/service" @@ -122,6 +123,7 @@ type controller struct { grafanaService grafanaservice.Service grafanaConfig grafana.Config buildSchema *build.Schema + eventMgr eventmanager.Manager } var _ Controller = (*controller)(nil) @@ -156,9 +158,11 @@ func NewController(config *config.Config, param *param.Param) Controller { grafanaService: param.GrafanaService, grafanaConfig: config.GrafanaConfig, buildSchema: param.BuildSchema, + eventMgr: param.EventManager, } } +//nolint may be used in the future func (c *controller) postHook(ctx context.Context, eventType hook.EventType, content interface{}) { if c.hook != nil { event := hook.Event{ diff --git a/core/controller/cluster/controller_basic.go b/core/controller/cluster/controller_basic.go index 8d3b56a5..e72980cd 100644 --- a/core/controller/cluster/controller_basic.go +++ b/core/controller/cluster/controller_basic.go @@ -21,7 +21,7 @@ import ( "g.hz.netease.com/horizon/pkg/cluster/registry" emvregionmodels "g.hz.netease.com/horizon/pkg/environmentregion/models" perror "g.hz.netease.com/horizon/pkg/errors" - "g.hz.netease.com/horizon/pkg/hook/hook" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" membermodels "g.hz.netease.com/horizon/pkg/member/models" regionmodels "g.hz.netease.com/horizon/pkg/region/models" tagmanager "g.hz.netease.com/horizon/pkg/tag/manager" @@ -475,7 +475,18 @@ func (c *controller) CreateCluster(ctx context.Context, applicationID uint, envi r.TemplateInput.Pipeline, r.TemplateInput.Application) // 11. post hook - c.postHook(ctx, hook.CreateCluster, ret) + // c.postHook(ctx, hook.CreateCluster, ret) + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Created, + ResourceID: ret.ID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } return ret, nil } @@ -803,7 +814,18 @@ func (c *controller) DeleteCluster(ctx context.Context, clusterID uint, hard boo } // 5. post hook - c.postHook(newctx, hook.DeleteCluster, cluster.Name) + // c.postHook(newctx, hook.DeleteCluster, cluster.Name) + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Deleted, + ResourceID: clusterID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } }() return nil @@ -872,6 +894,19 @@ func (c *controller) FreeCluster(ctx context.Context, clusterID uint) (err error log.Errorf(newctx, "failed to update cluster: %v, err: %v", cluster.Name, err) return } + + // 4. create event + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Freed, + ResourceID: clusterID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } }() return nil diff --git a/core/controller/cluster/controller_basic_test.go b/core/controller/cluster/controller_basic_test.go index 1de2dd5d..c78a56a8 100644 --- a/core/controller/cluster/controller_basic_test.go +++ b/core/controller/cluster/controller_basic_test.go @@ -86,6 +86,7 @@ func testListClusterByNameFuzzily(t *testing.T) { applicationSvc: applicationservice.NewService(groupservice.NewService(manager), manager), groupManager: manager.GroupManager, memberManager: manager.MemberManager, + eventMgr: manager.EventManager, } resps, count, err := c.List(ctx, &q.Query{Keywords: q.KeyWords{common.ClusterQueryName: "fuzzilyCluster"}}) @@ -201,6 +202,7 @@ func testListUserClustersByNameFuzzily(t *testing.T) { applicationSvc: applicationservice.NewService(groupservice.NewService(manager), manager), groupManager: manager.GroupManager, memberManager: manager.MemberManager, + eventMgr: manager.EventManager, } resps, count, err := c.List(ctx, @@ -287,6 +289,7 @@ func testControllerFreeOrDeleteClusterFailed(t *testing.T) { groupManager: manager.GroupManager, envMgr: manager.EnvMgr, regionMgr: manager.RegionMgr, + eventMgr: manager.EventManager, } id, err := registrydao.NewDAO(db).Create(ctx, ®istrymodels.Registry{ diff --git a/core/controller/cluster/controller_internal.go b/core/controller/cluster/controller_internal.go index fb9306d8..eb305b1f 100644 --- a/core/controller/cluster/controller_internal.go +++ b/core/controller/cluster/controller_internal.go @@ -10,7 +10,9 @@ import ( codemodels "g.hz.netease.com/horizon/pkg/cluster/code" "g.hz.netease.com/horizon/pkg/cluster/gitrepo" perror "g.hz.netease.com/horizon/pkg/errors" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" prmodels "g.hz.netease.com/horizon/pkg/pipelinerun/models" + "g.hz.netease.com/horizon/pkg/server/middleware/requestid" "g.hz.netease.com/horizon/pkg/util/log" "g.hz.netease.com/horizon/pkg/util/wlog" ) @@ -135,6 +137,18 @@ func (c *controller) InternalDeploy(ctx context.Context, clusterID uint, return nil, err } + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Deployed, + ResourceID: cluster.ID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } + return &InternalDeployResponse{ PipelinerunID: pr.ID, Commit: commit, diff --git a/core/controller/cluster/controller_operation.go b/core/controller/cluster/controller_operation.go index 231e1ecf..4c44de56 100644 --- a/core/controller/cluster/controller_operation.go +++ b/core/controller/cluster/controller_operation.go @@ -10,7 +10,9 @@ import ( "g.hz.netease.com/horizon/pkg/cluster/cd" cmodels "g.hz.netease.com/horizon/pkg/cluster/models" perror "g.hz.netease.com/horizon/pkg/errors" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" prmodels "g.hz.netease.com/horizon/pkg/pipelinerun/models" + "g.hz.netease.com/horizon/pkg/server/middleware/requestid" tmodels "g.hz.netease.com/horizon/pkg/tag/models" "g.hz.netease.com/horizon/pkg/util/log" "g.hz.netease.com/horizon/pkg/util/wlog" @@ -203,6 +205,19 @@ func (c *controller) Deploy(ctx context.Context, clusterID uint, return nil, err } + // 8. record event + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Deployed, + ResourceID: cluster.ID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } + return &PipelinerunIDResponse{ PipelinerunID: prCreated.ID, }, nil @@ -335,6 +350,19 @@ func (c *controller) Rollback(ctx context.Context, return nil, err } + // 11. record event + rid, _ := requestid.FromContext(ctx) + if _, err := c.eventMgr.CreateEvent(ctx, &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + Action: eventmodels.Rollbacked, + ResourceID: cluster.ID, + }, + ReqID: rid, + }); err != nil { + log.Warningf(ctx, "failed to create event, err: %s", err.Error()) + } + return &PipelinerunIDResponse{ PipelinerunID: prCreated.ID, }, nil diff --git a/core/controller/cluster/controller_test.go b/core/controller/cluster/controller_test.go index f5fcb268..69b6d7c8 100644 --- a/core/controller/cluster/controller_test.go +++ b/core/controller/cluster/controller_test.go @@ -39,6 +39,7 @@ import ( envmodels "g.hz.netease.com/horizon/pkg/environment/models" envregionmodels "g.hz.netease.com/horizon/pkg/environmentregion/models" perror "g.hz.netease.com/horizon/pkg/errors" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" groupmodels "g.hz.netease.com/horizon/pkg/group/models" groupservice "g.hz.netease.com/horizon/pkg/group/service" membermodels "g.hz.netease.com/horizon/pkg/member/models" @@ -428,8 +429,8 @@ const secondsInOneDay = 24 * 3600 func TestMain(m *testing.M) { if err := db.AutoMigrate(&appmodels.Application{}, &models.Cluster{}, &groupmodels.Group{}, &trmodels.TemplateRelease{}, &membermodels.Member{}, &usermodels.User{}, - ®istrymodels.Registry{}, - ®ionmodels.Region{}, &envregionmodels.EnvironmentRegion{}, + ®istrymodels.Registry{}, eventmodels.Event{}, + ®ionmodels.Region{}, &envregionmodels.EnvironmentRegion{}, &eventmodels.Event{}, &prmodels.Pipelinerun{}, &tagmodel.ClusterTemplateSchemaTag{}, &tmodel.Tag{}, &envmodels.Environment{}); err != nil { panic(err) } @@ -612,6 +613,7 @@ func test(t *testing.T) { schemaTagManager: manager.ClusterSchemaTagMgr, tagMgr: tagManager, applicationGitRepo: applicationGitRepo, + eventMgr: manager.EventManager, } tagManager.EXPECT().ListByResourceTypeIDs(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() @@ -1244,6 +1246,7 @@ func testV2(t *testing.T) { tagMgr: tagManager, registryFty: registryFty, cd: mockCd, + eventMgr: manager.EventManager, } clusterGitRepo.EXPECT().CreateCluster(ctx, gomock.Any()).Return(nil).Times(1) diff --git a/core/controller/webhook/controller.go b/core/controller/webhook/controller.go new file mode 100644 index 00000000..be1432ce --- /dev/null +++ b/core/controller/webhook/controller.go @@ -0,0 +1,167 @@ +package webhook + +import ( + "context" + + "g.hz.netease.com/horizon/core/common" + "g.hz.netease.com/horizon/lib/q" + "g.hz.netease.com/horizon/pkg/param" + usermanager "g.hz.netease.com/horizon/pkg/user/manager" + "g.hz.netease.com/horizon/pkg/util/wlog" + wmanager "g.hz.netease.com/horizon/pkg/webhook/manager" + "g.hz.netease.com/horizon/pkg/webhook/models" +) + +type Controller interface { + CreateWebhook(ctx context.Context, w *CreateWebhookRequest) (*Webhook, error) + GetWebhook(ctx context.Context, id uint) (*Webhook, error) + ListWebhooks(ctx context.Context, resourceType string, + resourceID uint, query *q.Query) ([]*Webhook, int64, error) + UpdateWebhook(ctx context.Context, id uint, + w *UpdateWebhookRequest) (*Webhook, error) + DeleteWebhook(ctx context.Context, id uint) error + ListWebhookLogs(ctx context.Context, wID uint, query *q.Query) ([]*LogSummary, int64, error) + GetWebhookLog(ctx context.Context, id uint) (*Log, error) + RetryWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) +} + +type controller struct { + webhookMgr wmanager.Manager + userManager usermanager.Manager +} + +func NewController(param *param.Param) Controller { + return &controller{ + webhookMgr: param.WebhookManager, + userManager: param.UserManager, + } +} + +func (c *controller) CreateWebhook(ctx context.Context, w *CreateWebhookRequest) (*Webhook, error) { + const op = "webhook controller: create" + defer wlog.Start(ctx, op).StopPrint() + + // 1. validate request + if err := w.validate(); err != nil { + return nil, err + } + + // 2. transfer model + wm := w.toModel(ctx) + + // 3. create webhook + wm, err := c.webhookMgr.CreateWebhook(ctx, wm) + if err != nil { + return nil, err + } + + return ofWebhookModel(wm), nil +} + +func (c *controller) GetWebhook(ctx context.Context, id uint) (*Webhook, error) { + const op = "wehook controller: get" + defer wlog.Start(ctx, op).StopPrint() + + w, err := c.webhookMgr.GetWebhook(ctx, id) + if err != nil { + return nil, err + } + return ofWebhookModel(w), nil +} + +func (c *controller) UpdateWebhook(ctx context.Context, id uint, + w *UpdateWebhookRequest) (*Webhook, error) { + const op = "wehook controller: update" + defer wlog.Start(ctx, op).StopPrint() + + // 1. validate request + if err := w.validate(); err != nil { + return nil, err + } + + // 2. transfer model + wm, err := c.webhookMgr.GetWebhook(ctx, id) + if err != nil { + return nil, err + } + wm = w.toModel(ctx, wm) + + // 3. update webhook + wm, err = c.webhookMgr.UpdateWebhook(ctx, id, wm) + if err != nil { + return nil, err + } + + return ofWebhookModel(wm), nil +} + +func (c *controller) DeleteWebhook(ctx context.Context, id uint) error { + const op = "wehook controller: delete" + defer wlog.Start(ctx, op).StopPrint() + + return c.webhookMgr.DeleteWebhook(ctx, id) +} + +func (c *controller) ListWebhooks(ctx context.Context, resourceType string, + resourceID uint, query *q.Query) ([]*Webhook, int64, error) { + const op = "wehook controller: list" + defer wlog.Start(ctx, op).StopPrint() + + resource := map[string][]uint{ + resourceType: {resourceID}, + } + webhooks, total, err := c.webhookMgr.ListWebhookOfResources(ctx, resource, query) + if err != nil { + return nil, total, err + } + + var ws []*Webhook + for _, w := range webhooks { + ws = append(ws, ofWebhookModel(w)) + } + return ws, total, nil +} + +func (c *controller) ListWebhookLogs(ctx context.Context, wID uint, + query *q.Query) ([]*LogSummary, int64, error) { + const op = "wehook controller: list log" + defer wlog.Start(ctx, op).StopPrint() + + webhookLogs, total, err := c.webhookMgr.ListWebhookLogs(ctx, wID, query) + if err != nil { + return nil, total, err + } + + var wls []*LogSummary + for _, wl := range webhookLogs { + wls = append(wls, ofWebhookLogSummaryModel(wl)) + } + return wls, total, nil +} + +func (c *controller) GetWebhookLog(ctx context.Context, id uint) (*Log, error) { + const op = "wehook controller: get log" + defer wlog.Start(ctx, op).StopPrint() + + wl, err := c.webhookMgr.GetWebhookLog(ctx, id) + if err != nil { + return nil, err + } + + userMap, err := c.userManager.GetUserMapByIDs(ctx, + []uint{wl.CreatedBy}) + if err != nil { + return nil, err + } + + webhookLog := ofWebhookLogModel(wl) + webhookLog.CreatedBy = common.ToUser(userMap[wl.CreatedBy]) + return webhookLog, nil +} + +func (c *controller) RetryWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) { + const op = "wehook controller: retry log" + defer wlog.Start(ctx, op).StopPrint() + + return c.webhookMgr.RetryWebhookLog(ctx, id) +} diff --git a/core/controller/webhook/controller_test.go b/core/controller/webhook/controller_test.go new file mode 100644 index 00000000..64444fa7 --- /dev/null +++ b/core/controller/webhook/controller_test.go @@ -0,0 +1,121 @@ +package webhook + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "gorm.io/gorm" + + "g.hz.netease.com/horizon/core/common" + "g.hz.netease.com/horizon/lib/orm" + userauth "g.hz.netease.com/horizon/pkg/authentication/user" + "g.hz.netease.com/horizon/pkg/event/models" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" + "g.hz.netease.com/horizon/pkg/param" + "g.hz.netease.com/horizon/pkg/param/managerparam" + usermodels "g.hz.netease.com/horizon/pkg/user/models" + utilcommon "g.hz.netease.com/horizon/pkg/util/common" + webhookmodels "g.hz.netease.com/horizon/pkg/webhook/models" +) + +var ( + ctx = context.Background() + db *gorm.DB + c *controller + + updateWebhookReq = UpdateWebhookRequest{ + Enabled: utilcommon.BoolPtr(true), + URL: utilcommon.StringPtr("http://xxxx"), + SslVerifyEnabled: utilcommon.BoolPtr(false), + Triggers: []string{ + JoinResourceAction(string(eventmodels.Cluster), string(eventmodels.Created)), + }, + } + createWebhookReq = CreateWebhookRequest{ + URL: "http://xxx", + Enabled: true, + SslVerifyEnabled: false, + Triggers: []string{ + JoinResourceAction(string(eventmodels.Cluster), string(eventmodels.Created)), + }, + ResourceType: "clusters", + ResourceID: 1, + } +) + +func createContext() { + db, _ = orm.NewSqliteDB("file::memory:?cache=shared") + if err := db.AutoMigrate(&webhookmodels.Webhook{}, + &webhookmodels.WebhookLog{}, &usermodels.User{}); err != nil { + panic(err) + } + if err := db.AutoMigrate(); err != nil { + panic(err) + } + ctx = context.Background() + // nolint + ctx = common.WithContext(ctx, &userauth.DefaultInfo{ + Name: "Jerry", + ID: 1, + Admin: true, + }) + mgrParam := managerparam.InitManager(db) + controllerParam := param.Param{ + Manager: mgrParam, + } + c = NewController(&controllerParam).(*controller) +} + +func Test(t *testing.T) { + createContext() + w, err := c.CreateWebhook(ctx, &createWebhookReq) + assert.Nil(t, err) + assert.Equal(t, createWebhookReq.URL, w.URL) + + w, err = c.GetWebhook(ctx, w.ID) + assert.Nil(t, err) + assert.Equal(t, createWebhookReq.URL, w.URL) + + uw := updateWebhookReq + uw.URL = utilcommon.StringPtr("http://bbb") + w, err = c.UpdateWebhook(ctx, w.ID, &uw) + assert.Nil(t, err) + assert.Equal(t, *(uw.URL), w.URL) + + ws, _, err := c.ListWebhooks(ctx, string(models.Cluster), createWebhookReq.ResourceID, nil) + assert.Nil(t, err) + assert.Equal(t, 1, len(ws)) + assert.Equal(t, *(uw.URL), w.URL) + + wl, err := c.webhookMgr.CreateWebhookLog(ctx, &webhookmodels.WebhookLog{ + WebhookID: w.ID, + URL: w.URL, + Status: webhookmodels.StatusWaiting, + }) + assert.Nil(t, err) + + wlSummary, err := c.GetWebhookLog(ctx, wl.ID) + assert.Nil(t, err) + assert.Equal(t, wl.URL, wlSummary.URL) + + wl.Status = webhookmodels.StatusSuccess + wl, err = c.webhookMgr.UpdateWebhookLog(ctx, wl) + assert.Nil(t, err) + + wlRetry, err := c.RetryWebhookLog(ctx, wl.ID) + assert.Nil(t, err) + assert.Equal(t, wl.URL, wlRetry.URL) + assert.Equal(t, webhookmodels.StatusWaiting, wlRetry.Status) + + wlss, _, err := c.ListWebhookLogs(ctx, w.ID, nil) + assert.Nil(t, err) + assert.Equal(t, 2, len(wlss)) + + err = c.DeleteWebhook(ctx, w.ID) + assert.Nil(t, err) + + wlss, _, err = c.ListWebhookLogs(ctx, w.ID, nil) + assert.Nil(t, err) + assert.Equal(t, 0, len(wlss)) +} diff --git a/core/controller/webhook/models.go b/core/controller/webhook/models.go new file mode 100644 index 00000000..1a82473b --- /dev/null +++ b/core/controller/webhook/models.go @@ -0,0 +1,295 @@ +package webhook + +import ( + "context" + "fmt" + "regexp" + "strings" + "time" + + "g.hz.netease.com/horizon/core/common" + herrors "g.hz.netease.com/horizon/core/errors" + perror "g.hz.netease.com/horizon/pkg/errors" + "g.hz.netease.com/horizon/pkg/event/models" + wmodels "g.hz.netease.com/horizon/pkg/webhook/models" +) + +const ( + _triggerSeparator = "," + _resourceActionSeparator = "_" +) + +type UpdateWebhookRequest struct { + Enabled *bool `json:"enabled"` + URL *string `json:"url"` + SslVerifyEnabled *bool `json:"sslVerifyEnabled"` + Description *string `json:"description"` + Secret *string `json:"secret"` + Triggers []string `json:"triggers"` +} + +type CreateWebhookRequest struct { + Enabled bool `json:"enabled"` + URL string `json:"url"` + SslVerifyEnabled bool `json:"sslVerifyEnabled"` + Description string `json:"description"` + Secret string `json:"secret"` + Triggers []string `json:"triggers"` + ResourceType string `json:"-"` + ResourceID uint `json:"-"` +} + +type Webhook struct { + CreateWebhookRequest + ID uint `json:"id"` + CreatedAt time.Time `json:"createdAt"` + CreatedBy *common.User `json:"createdBy,omitempty"` + UpdatedAt time.Time `json:"updatedAt"` + UpdatedBy *common.User `json:"updatedBy,omitempty"` +} + +type LogSummary struct { + ID uint `json:"id"` + WebhookID uint `json:"webhookID"` + EventID uint `json:"eventID"` + URL string `json:"url"` + Status string `json:"status"` + ErrorMessage string `json:"errorMessage"` + CreatedAt time.Time `json:"createdAt"` + CreatedBy *common.User `json:"createdBy,omitempty"` + UpdatedAt time.Time `json:"updatedAt"` + UpdatedBy *common.User `json:"updatedBy,omitempty"` +} + +type Log struct { + LogSummary + RequestHeaders string `json:"requestHeaders"` + RequestData string `json:"requestData"` + ResponseHeaders string `json:"responseHeaders"` + ResponseBody string `json:"responseBody"` +} + +func (w *UpdateWebhookRequest) toModel(ctx context.Context, wm *wmodels.Webhook) *wmodels.Webhook { + if w.Enabled != nil { + wm.Enabled = *w.Enabled + } + if w.URL != nil { + wm.URL = *w.URL + } + if w.SslVerifyEnabled != nil { + wm.SslVerifyEnabled = *w.SslVerifyEnabled + } + if w.Description != nil { + wm.Description = *w.Description + } + if w.Secret != nil { + wm.Secret = *w.Secret + } + if len(w.Triggers) > 0 { + wm.Triggers = JoinTriggers(w.Triggers) + } + return wm +} + +func (w *UpdateWebhookRequest) validate() error { + if w.URL != nil { + if err := validateURL(*w.URL); err != nil { + return err + } + } + if len(w.Triggers) > 0 { + return validateTriggers(w.Triggers) + } + return nil +} + +func (w *CreateWebhookRequest) toModel(ctx context.Context) *wmodels.Webhook { + wm := &wmodels.Webhook{ + Enabled: w.Enabled, + URL: w.URL, + SslVerifyEnabled: w.SslVerifyEnabled, + Description: w.Description, + Secret: w.Secret, + Triggers: JoinTriggers(w.Triggers), + } + switch w.ResourceType { + case common.ResourceGroup: + wm.ResourceType = string(models.Group) + case common.ResourceApplication: + wm.ResourceType = string(models.Application) + case common.ResourceCluster: + wm.ResourceType = string(models.Cluster) + } + wm.ResourceID = w.ResourceID + return wm +} + +func (w *CreateWebhookRequest) validate() error { + switch w.ResourceType { + case common.ResourceGroup, common.ResourceApplication, common.ResourceCluster: + default: + return perror.Wrap(herrors.ErrParamInvalid, + fmt.Sprintf("invalid resource type: %s", w.ResourceType)) + } + if err := validateURL(w.URL); err != nil { + return err + } + if len(w.Triggers) <= 0 { + return perror.Wrap(herrors.ErrParamInvalid, "triggers should not be empty") + } + return validateTriggers(w.Triggers) +} + +func validateURL(url string) error { + re := `^http(s)?://.+$` + pattern := regexp.MustCompile(re) + if !pattern.MatchString(url) { + return perror.Wrap(herrors.ErrParamInvalid, + fmt.Sprintf("invalid url, should satisfies the pattern %v", re)) + } + return nil +} + +func validateTriggers(triggers []string) error { + validateClusterAction := func(action models.EventAction) bool { + switch action { + case models.Builded, models.Deployed, models.Freed, models.Rollbacked: + default: + return false + } + return true + } + + validateApplicationAction := func(action models.EventAction) bool { + switch action { + case models.Transferred: + default: + return false + } + return true + } + + validateCommonAction := func(action models.EventAction) bool { + switch action { + case models.AnyAction, models.Created, models.Deleted: + default: + return false + } + return true + } + + for _, trigger := range triggers { + resource, action, err := ParseResourceAction(trigger) + if err != nil { + return err + } + switch resource { + case models.Cluster: + if !validateCommonAction(action) && !validateClusterAction(action) { + return perror.Wrap(herrors.ErrParamInvalid, + fmt.Sprintf("invalid trigger: %s", trigger)) + } + case models.Application: + if !validateCommonAction(action) && !validateApplicationAction(action) { + return perror.Wrap(herrors.ErrParamInvalid, + fmt.Sprintf("invalid trigger: %s", trigger)) + } + default: + return perror.Wrap(herrors.ErrParamInvalid, + fmt.Sprintf("invalid trigger: %s", trigger)) + } + } + return nil +} + +func ParseTriggerStr(triggerStr string) []string { + return strings.Split(triggerStr, _triggerSeparator) +} + +func JoinTriggers(triggers []string) string { + return strings.Join(triggers, _triggerSeparator) +} + +func ParseResourceAction(trigger string) (models.EventResourceType, models.EventAction, error) { + parts := strings.Split(trigger, _resourceActionSeparator) + if len(parts) != 2 { + return "", "", perror.Wrap(herrors.ErrParamInvalid, + fmt.Sprintf("invalid trigger %s", trigger)) + } + resource := parts[0] + action := parts[1] + return models.EventResourceType(resource), models.EventAction(action), nil +} + +func JoinResourceAction(resource, action string) string { + return strings.Join([]string{resource, action}, _resourceActionSeparator) +} + +func CheckIfEventMatch(webhook *wmodels.Webhook, event *models.Event) (bool, error) { + triggers := ParseTriggerStr(webhook.Triggers) + for _, trigger := range triggers { + r, a, err := ParseResourceAction(trigger) + if err != nil { + return false, err + } + if (r == models.AnyResource || r == models.EventResourceType(event.ResourceType)) && + (a == models.AnyAction || a == models.EventAction(event.Action)) { + return true, nil + } + } + return false, nil +} + +func ofWebhookModel(wm *wmodels.Webhook) *Webhook { + w := &Webhook{ + CreateWebhookRequest: CreateWebhookRequest{ + Enabled: wm.Enabled, + URL: wm.URL, + SslVerifyEnabled: wm.SslVerifyEnabled, + Description: wm.Description, + Secret: wm.Secret, + Triggers: ParseTriggerStr(wm.Triggers), + ResourceType: wm.ResourceType, + ResourceID: wm.ResourceID, + }, + ID: wm.ID, + CreatedAt: wm.CreatedAt, + UpdatedAt: wm.UpdatedAt, + } + + return w +} + +func ofWebhookLogSummaryModel(wm *wmodels.WebhookLog) *LogSummary { + wl := &LogSummary{ + ID: wm.ID, + WebhookID: wm.WebhookID, + EventID: wm.EventID, + URL: wm.URL, + Status: wm.Status, + ErrorMessage: wm.ErrorMessage, + CreatedAt: wm.CreatedAt, + UpdatedAt: wm.UpdatedAt, + } + return wl +} + +func ofWebhookLogModel(wm *wmodels.WebhookLog) *Log { + wl := &Log{ + LogSummary: LogSummary{ + ID: wm.ID, + WebhookID: wm.WebhookID, + EventID: wm.EventID, + URL: wm.URL, + Status: wm.Status, + ErrorMessage: wm.ErrorMessage, + CreatedAt: wm.CreatedAt, + UpdatedAt: wm.UpdatedAt, + }, + RequestHeaders: wm.RequestHeaders, + RequestData: wm.RequestData, + ResponseHeaders: wm.ResponseHeaders, + ResponseBody: wm.ResponseBody, + } + return wl +} diff --git a/core/errors/horizonerrors.go b/core/errors/horizonerrors.go index 12b74bad..202af512 100644 --- a/core/errors/horizonerrors.go +++ b/core/errors/horizonerrors.go @@ -48,6 +48,10 @@ var ( KubeConfigInK8S = sourceType{name: "KubeConfigK8S"} GroupFullPath = sourceType{name: "GroupFullPath"} IdentityProviderInDB = sourceType{name: "IdentityProviderInDB"} + EventInDB = sourceType{name: "EventInDB"} + EventCursorInDB = sourceType{name: "EventCursorInDB"} + WebhookInDB = sourceType{name: "WebhookInDB"} + WebhookLogInDB = sourceType{name: "WebhookLogInDB"} // S3 PipelinerunLog = sourceType{name: "PipelinerunLog"} @@ -253,16 +257,15 @@ var ( ErrRegistryUsedByRegions = errors.New("cannot delete a registry when used by regions") // ErrRegionUsedByClusters used when deleting a region that is still used by clusters - ErrRegionUsedByClusters = errors.New("cannot delete a region when used by clusters") - - ErrPipelineOutPut = errors.New("pipeline output is not valid") - + ErrRegionUsedByClusters = errors.New("cannot delete a region when used by clusters") + ErrPipelineOutPut = errors.New("pipeline output is not valid") ErrTemplateParamInvalid = errors.New("parameters of template are invalid") ErrTemplateReleaseParamInvalid = errors.New("parameters of release are invalid") + ErrAPIServerResponseNotOK = errors.New("response for api-server is not 200 OK") + ErrListGrafanaDashboard = errors.New("List grafana dashboards error") - ErrAPIServerResponseNotOK = errors.New("response for api-server is not 200 OK") - - ErrListGrafanaDashboard = errors.New("List grafana dashboards error") + // event + ErrEventHandlerAlreadyExist = errors.New("event handler already exist") ErrSessionNotFound = errors.New("session not found") ErrSessionSaveFailed = errors.New("failed to save session") diff --git a/core/http/api/v1/webhook/apis.go b/core/http/api/v1/webhook/apis.go new file mode 100644 index 00000000..435796d2 --- /dev/null +++ b/core/http/api/v1/webhook/apis.go @@ -0,0 +1,304 @@ +package webhook + +import ( + "strconv" + + "github.com/gin-gonic/gin" + + "g.hz.netease.com/horizon/core/common" + "g.hz.netease.com/horizon/core/controller/webhook" + webhookctl "g.hz.netease.com/horizon/core/controller/webhook" + herrors "g.hz.netease.com/horizon/core/errors" + "g.hz.netease.com/horizon/lib/q" + perror "g.hz.netease.com/horizon/pkg/errors" + "g.hz.netease.com/horizon/pkg/server/response" + "g.hz.netease.com/horizon/pkg/server/rpcerror" + "g.hz.netease.com/horizon/pkg/util/log" +) + +type API struct { + webhookCtl webhookctl.Controller +} + +func NewAPI(ctl webhookctl.Controller) *API { + return &API{ + webhookCtl: ctl, + } +} + +func (a *API) CreateWebhook(c *gin.Context) { + const op = "webhook: create" + resourceType := c.Param(_resourceTypeParam) + resourceIDStr := c.Param(_resourceIDParam) + resourceID, err := strconv.ParseUint(resourceIDStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid resource id: %s", resourceIDStr)) + return + } + + var request webhook.CreateWebhookRequest + if err := c.ShouldBindJSON(&request); err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid request body, err: %s", err.Error())) + return + } + request.ResourceType = resourceType + request.ResourceID = uint(resourceID) + + resp, err := a.webhookCtl.CreateWebhook(c, &request) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, resp) +} + +func (a *API) ListWebhooks(c *gin.Context) { + const op = "webhook: list" + resourceType := c.Param(_resourceTypeParam) + resourceIDStr := c.Param(_resourceIDParam) + resourceID, err := strconv.ParseUint(resourceIDStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid resource id: %s", resourceIDStr)) + return + } + + var ( + pageNumber, pageSize int + ) + + pageNumberStr := c.Query(common.PageNumber) + if pageNumberStr == "" { + pageNumber = common.DefaultPageNumber + } else { + pageNumber, err = strconv.Atoi(pageNumberStr) + if err != nil { + response.AbortWithRequestError(c, common.InvalidRequestParam, "invalid pageNumber") + return + } + } + + pageSizeStr := c.Query(common.PageSize) + if pageSizeStr == "" { + pageSize = common.DefaultPageSize + } else { + pageSize, err = strconv.Atoi(pageSizeStr) + if err != nil { + response.AbortWithRequestError(c, common.InvalidRequestParam, "invalid pageSize") + return + } + } + + items, total, err := a.webhookCtl.ListWebhooks(c, resourceType, uint(resourceID), &q.Query{ + PageSize: pageSize, + PageNumber: pageNumber, + }) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, response.DataWithTotal{ + Items: items, + Total: total, + }) +} + +func (a *API) UpdateWebhook(c *gin.Context) { + const op = "webhook: update" + idStr := c.Param(_webhookIDParam) + id, err := strconv.ParseUint(idStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid id: %s", idStr)) + return + } + + var request webhook.UpdateWebhookRequest + if err := c.ShouldBindJSON(&request); err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid request body, err: %s", err.Error())) + return + } + + resp, err := a.webhookCtl.UpdateWebhook(c, uint(id), &request) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, resp) +} + +func (a *API) GetWebhook(c *gin.Context) { + const op = "webhook: get" + idStr := c.Param(_webhookIDParam) + id, err := strconv.ParseUint(idStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid id: %s", idStr)) + return + } + + resp, err := a.webhookCtl.GetWebhook(c, uint(id)) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, resp) +} + +func (a *API) DeleteWebhook(c *gin.Context) { + const op = "webhook: delete" + idStr := c.Param(_webhookIDParam) + id, err := strconv.ParseUint(idStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid id: %s", idStr)) + return + } + + err = a.webhookCtl.DeleteWebhook(c, uint(id)) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.Success(c) +} + +func (a *API) ListWebhookLogs(c *gin.Context) { + const op = "webhook: list logs" + webhookIDStr := c.Param(_webhookIDParam) + webhookID, err := strconv.ParseUint(webhookIDStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid webhook id: %s", webhookIDStr)) + return + } + + var ( + pageNumber int + pageSize int + ) + + pageSizeStr := c.Query(common.PageSize) + if pageSizeStr == "" { + pageSize = common.DefaultPageSize + } else { + pageSize, err = strconv.Atoi(pageSizeStr) + if err != nil { + response.AbortWithRequestError(c, common.InvalidRequestParam, "invalid pageSize") + return + } + } + + items, total, err := a.webhookCtl.ListWebhookLogs(c, uint(webhookID), &q.Query{ + PageNumber: pageNumber, + PageSize: pageSize, + }) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, response.DataWithTotal{ + Items: items, + Total: total, + }) +} + +func (a *API) GetWebhookLog(c *gin.Context) { + const op = "webhook: get log" + idStr := c.Param(_webhookLogIDParam) + id, err := strconv.ParseUint(idStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid id: %s", idStr)) + return + } + + resp, err := a.webhookCtl.GetWebhookLog(c, uint(id)) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, resp) +} + +func (a *API) RetryWebhookLog(c *gin.Context) { + const op = "webhook: retry log" + idStr := c.Param(_webhookLogIDParam) + id, err := strconv.ParseUint(idStr, 10, 0) + if err != nil { + response.AbortWithRPCError(c, rpcerror.ParamError. + WithErrMsgf("invalid id: %s", idStr)) + return + } + + resp, err := a.webhookCtl.RetryWebhookLog(c, uint(id)) + if err != nil { + if perror.Cause(err) == herrors.ErrParamInvalid { + response.AbortWithRPCError(c, rpcerror.ParamError.WithErrMsg(err.Error())) + return + } else if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + response.AbortWithRPCError(c, rpcerror.NotFoundError.WithErrMsg(err.Error())) + return + } + log.WithFiled(c, "op", op).Errorf("%+v", err) + response.AbortWithRPCError(c, rpcerror.InternalError.WithErrMsg(err.Error())) + return + } + response.SuccessWithData(c, resp) +} diff --git a/core/http/api/v1/webhook/routers.go b/core/http/api/v1/webhook/routers.go new file mode 100644 index 00000000..b17117a3 --- /dev/null +++ b/core/http/api/v1/webhook/routers.go @@ -0,0 +1,64 @@ +package webhook + +import ( + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + + "g.hz.netease.com/horizon/pkg/server/route" +) + +const ( + _resourceTypeParam = "resourceType" + _resourceIDParam = "resourceID" + _webhookIDParam = "webhookID" + _webhookLogIDParam = "webhookLogID" +) + +func RegisterRoutes(engine *gin.Engine, api *API) { + group := engine.Group("/apis/core/v1") + var routers = route.Routes{ + { + Method: http.MethodPost, + Pattern: fmt.Sprintf("/:%v/:%v/webhooks", _resourceTypeParam, _resourceIDParam), + HandlerFunc: api.CreateWebhook, + }, + { + Method: http.MethodGet, + Pattern: fmt.Sprintf("/:%v/:%v/webhooks", _resourceTypeParam, _resourceIDParam), + HandlerFunc: api.ListWebhooks, + }, + { + Method: http.MethodPut, + Pattern: fmt.Sprintf("/webhooks/:%v", _webhookIDParam), + HandlerFunc: api.UpdateWebhook, + }, + { + Method: http.MethodGet, + Pattern: fmt.Sprintf("/webhooks/:%v", _webhookIDParam), + HandlerFunc: api.GetWebhook, + }, + { + Method: http.MethodDelete, + Pattern: fmt.Sprintf("/webhooks/:%v", _webhookIDParam), + HandlerFunc: api.DeleteWebhook, + }, + { + Method: http.MethodGet, + Pattern: fmt.Sprintf("/webhooks/:%v/logs", _webhookIDParam), + HandlerFunc: api.ListWebhookLogs, + }, + { + Method: http.MethodGet, + Pattern: fmt.Sprintf("/webhooklogs/:%v", _webhookLogIDParam), + HandlerFunc: api.GetWebhookLog, + }, + { + Method: http.MethodPost, + Pattern: fmt.Sprintf("/webhooklogs/:%v/retry", _webhookLogIDParam), + HandlerFunc: api.RetryWebhookLog, + }, + } + route.RegisterRoutes(group, routers) +} diff --git a/db/migrations/20220922_add_webhook_event.sql b/db/migrations/20220922_add_webhook_event.sql new file mode 100644 index 00000000..7f3da109 --- /dev/null +++ b/db/migrations/20220922_add_webhook_event.sql @@ -0,0 +1,57 @@ +CREATE TABLE `tb_event` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `req_id` varchar(256) NOT NULL DEFAULT '', + `resource_type` varchar(256) NOT NULL DEFAULT '', + `resource_id` varchar(256) NOT NULL DEFAULT '', + `action` varchar(256) NOT NULL DEFAULT '', + `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `created_by` bigint(20) unsigned NOT NULL DEFAULT '0', + PRIMARY KEY (`id`), + KEY `idx_req_id` (`req_id`), + KEY `idx_resource_action` (`resource_id`, `resource_type`, `action`) +) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4; + +CREATE TABLE `tb_event_cursor` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `position` bigint(20) NOT NULL DEFAULT 0, + `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `idx_value` (`value`) +) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4; + +CREATE TABLE `tb_webhook` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `enabled` tinyint(1) NOT NULL DEFAULT true, + `url` text NOT NULL, + `ssl_verify_enabled` tinyint(1) NOT NULL DEFAULT false, + `description` varchar(256) NOT NULL DEFAULT '', + `secret` text NOT NULL, + `triggers` text NOT NULL, + `resource_type` varchar(256) NOT NULL DEFAULT '', + `resource_id` bigint(20) NOT NULL DEFAULT 0, + `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `created_by` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '', + `updated_by` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '', + PRIMARY KEY (`id`) +) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4; + +CREATE TABLE `tb_webhook_log` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `webhook_id` bigint(20) unsigned NOT NULL, + `event_id` bigint(20) unsigned NOT NULL, + `url` text NOT NULL, + `request_headers` text NOT NULL, + `request_data` text NOT NULL, + `response_headers` text NOT NULL, + `response_body` text NOT NULL, + `status` varchar(256) NOT NULL, + `error_message` text NOT NULL, + `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `created_by` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '', + `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `idx_webhook_id_status` (`webhook_id`, `status`), + KEY `idx_event_id` (`event_id`) +) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4; \ No newline at end of file diff --git a/go.sum b/go.sum index 357da5ba..78ebd7a3 100644 --- a/go.sum +++ b/go.sum @@ -379,8 +379,9 @@ github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3/go.mod h1:j1nZW github.com/cloudevents/sdk-go v1.0.0 h1:gS5I0s2qPmdc4GBPlUmzZU7RH30BaiOdcRJ1RkXnPrc= github.com/cloudevents/sdk-go v1.0.0/go.mod h1:3TkmM0cFqkhCHOq5JzzRU/RxRkwzoS8TZ+G448qVTog= github.com/cloudevents/sdk-go/v2 v2.0.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU= -github.com/cloudevents/sdk-go/v2 v2.1.0 h1:bmgrU8k+K2ppZ+G/q5xEQx/Xk9HRtJmkrEO3qtDO2k0= github.com/cloudevents/sdk-go/v2 v2.1.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU= +github.com/cloudevents/sdk-go/v2 v2.12.0 h1:p1k+ysVOZtNiXfijnwB3WqZNA3y2cGOiKQygWkUHCEI= +github.com/cloudevents/sdk-go/v2 v2.12.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= github.com/clusterhq/flocker-go v0.0.0-20160920122132-2b8b7259d313/go.mod h1:P1wt9Z3DP8O6W3rvwCt0REIlshg1InHImaLW0t3ObY0= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= @@ -831,8 +832,6 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-containerregistry v0.0.0-20191010200024-a3d713f9b7f8/go.mod h1:KyKXa9ciM8+lgMXwOVsXi7UxGrsf9mM61Mzs+xKUrKE= github.com/google/go-containerregistry v0.0.0-20200115214256-379933c9c22b/go.mod h1:Wtl/v6YdQxv397EREtzwgd9+Ud7Q5D8XMbi3Zazgkrs= github.com/google/go-containerregistry v0.0.0-20200123184029-53ce695e4179/go.mod h1:Wtl/v6YdQxv397EREtzwgd9+Ud7Q5D8XMbi3Zazgkrs= @@ -1170,7 +1169,6 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= -github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk= github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= github.com/llorllale/go-gitlint v0.0.0-20190914155841-58c0b8cef0e5/go.mod h1:omoASPlaaf3ECEhTMfLZVS6o550eBWI2YsM/saGEbVA= @@ -1656,6 +1654,7 @@ github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8t github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1666,6 +1665,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= @@ -2156,6 +2158,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s= golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -2528,6 +2532,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.1.2 h1:OofcyE2lga734MxwcCW9uB4mWNXMr50uaGRVwQL2B0M= gorm.io/driver/mysql v1.1.2/go.mod h1:4P/X9vSc3WTrhTLZ259cpFd6xKNYiSSdSZngkSBGIMM= gorm.io/driver/sqlite v1.1.3/go.mod h1:AKDgRWk8lcSQSw+9kxCJnX/yySj8G3rdwYlU57cB45c= diff --git a/job/cmd/cmd.go b/job/cmd/cmd.go index 72138c2d..278b9095 100644 --- a/job/cmd/cmd.go +++ b/job/cmd/cmd.go @@ -7,6 +7,9 @@ import ( "fmt" "log" "os" + "os/signal" + "sync" + "syscall" "g.hz.netease.com/horizon/core/config" clusterctl "g.hz.netease.com/horizon/core/controller/cluster" @@ -17,11 +20,14 @@ import ( "g.hz.netease.com/horizon/job/autofree" "g.hz.netease.com/horizon/lib/orm" "g.hz.netease.com/horizon/pkg/cluster/cd" + eventhandlersvc "g.hz.netease.com/horizon/pkg/eventhandler" + "g.hz.netease.com/horizon/pkg/eventhandler/wlgenerator" "g.hz.netease.com/horizon/pkg/grafana" "g.hz.netease.com/horizon/pkg/param" "g.hz.netease.com/horizon/pkg/param/managerparam" "g.hz.netease.com/horizon/pkg/util/kube" callbacks "g.hz.netease.com/horizon/pkg/util/ormcallbacks" + webhooksvc "g.hz.netease.com/horizon/pkg/webhook/service" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ) @@ -111,7 +117,7 @@ func Run(flags *Flags) { ) // init kube client - _, client, err := kube.BuildClient("") + _, client, err := kube.BuildClient("/tmp/config_dev2") if err != nil { panic(err) } @@ -131,6 +137,21 @@ func Run(flags *Flags) { parameter.UserManager, clusterCtl, prCtl, environmentCtl) }() + // start event handler service to generate webhook log by events + eventHandlerService := eventhandlersvc.NewService(ctx, manager) + if err := eventHandlerService.RegisterEventHandler("webhook", + wlgenerator.NewWebhookLogGenerator(manager)); err != nil { + log.Printf("failed to register event handler, error: %s", err.Error()) + } + eventHandlerService.Start() + + // start webhook service with multi workers to consume webhook logs and send webhook events + webhookService := webhooksvc.NewService(ctx, manager) + webhookService.Start() + + // graceful exit + setTasksBeforeExit(webhookService.StopAndWait, eventHandlerService.StopAndWait) + r := gin.New() // use middleware middlewares := []gin.HandlerFunc{ @@ -146,3 +167,25 @@ func Run(flags *Flags) { log.Printf("Server started") log.Print(r.Run(fmt.Sprintf(":%d", coreConfig.ServerConfig.Port))) } + +// setTasksBeforeExit set stop funcs which will be executed after sigterm and sigint catched +func setTasksBeforeExit(stopFuncs ...func()) { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) + go func() { + s := <-sig + log.Printf("got %s signal, stop tasks...\n", s) + if len(stopFuncs) == 0 { + return + } + wg := sync.WaitGroup{} + wg.Add(len(stopFuncs)) + for _, stopFunc := range stopFuncs { + go func(stop func()) { + stop() + }(stopFunc) + } + wg.Wait() + log.Printf("all tasks stopped, exit now.") + }() +} diff --git a/openapi/restful/webhook.yaml b/openapi/restful/webhook.yaml new file mode 100644 index 00000000..af50a34f --- /dev/null +++ b/openapi/restful/webhook.yaml @@ -0,0 +1,334 @@ +openapi: 3.0.1 +info: + title: Horizon-Webhook-Restful + description: Restful API About Webhook + version: 1.0.0 +servers: + - url: 'http://localhost:8080/' +paths: + /apis/core/v1/{resourceType}/{resourceID}/webhooks: + parameters: + - name: resourceType + in: path + description: resource type + required: true + enum: [ "application", "cluster" ] + post: + tags: + - webhook + operationId: createWebhook + summary: create a webhook + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateOrUpdateWebhook' + responses: + "200": + description: Success + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + get: + tags: + - webhook + operationId: listWebhooks + summary: list webhooks + responses: + '200': + description: Succuss + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Webhook' + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + /apis/core/v1/webhooks/{webhookID}: + parameters: + - name: webhookID + in: path + description: webhook id + required: true + put: + tags: + - webhook + operationId: updateWebhook + summary: update a webhook + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CreateOrUpdateWebhook' + responses: + '200': + description: Success + content: + application/json: + schema: + $ref: '#/components/schemas/Webhook' + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + get: + tags: + - webhook + operationId: getWebhook + summary: get a webhook + responses: + '200': + description: Success + content: + application/json: + schema: + $ref: '#/components/schemas/Webhook' + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + delete: + tags: + - member + operationId: deleteWebhook + summary: delete a webhook + responses: + '200': + description: Success + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + /apis/core/v1/webhooks/{webhookID}/logs: + parameters: + - name: webhookID + in: path + description: webhook id + required: true + get: + tags: + - webhook + operationId: listWebhookLogs + summary: list logs of webhook + responses: + '200': + description: Success + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/WebhookLogSummary' + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + /apis/core/v1/webhooklogs/{webhookLogID}: + parameters: + - name: webhookLogID + in: path + description: webhook log id + required: true + get: + tags: + - webhook + operationId: getWebhookLog + summary: get a webhook log + responses: + '200': + description: Success + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/WebhookLog' + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" + /apis/core/v1/webhooklogs/{webhookLogID}/retry: + parameters: + - name: webhookLogID + in: path + description: webhook log id + required: true + get: + tags: + - webhook + operationId: retryWebhookLog + summary: retry a webhook log + responses: + '200': + description: Success + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/WebhookLog' + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "common.yaml#/components/schemas/Error" +components: + schemas: + CreateOrUpdateWebhook: + type: object + required: [ url, triggers ] + properties: + enabled: + $ref: '#/components/schemas/Enabled' + url: + $ref: '#/components/schemas/URL' + sslVerifyEnabled: + $ref: '#/components/schemas/SslVerifyEnabled' + description: + $ref: '#/components/schemas/Description' + secret: + $ref: '#/components/schemas/Secret' + triggers: + $ref: '#/components/schemas/Triggers' + + Webhook: + type: object + properties: + id: + $ref: '#/components/schemas/ID' + enabled: + $ref: '#/components/schemas/Enabled' + url: + $ref: '#/components/schemas/URL' + sslVerifyEnabled: + $ref: '#/components/schemas/SslVerifyEnabled' + description: + $ref: '#/components/schemas/Description' + secret: + $ref: '#/components/schemas/Secret' + trigger: + $ref: '#/components/schemas/Trigger' + createdAt: + $ref: '#/components/schemas/CreatedAt' + createdBy: + $ref: '#/components/schemas/User' + updatedAt: + $ref: '#/components/schemas/UpdatedAt' + updatedBy: + $ref: '#/components/schemas/User' + + WebhookLogSummary: + type: object + properties: + id: + $ref: '#/components/schemas/ID' + webhookID: + $ref: '#/components/schemas/ID' + url: + $ref: '#/components/schemas/URL' + status: + $ref: '#/components/schemas/Status' + ErrorMessage: + $ref: '#/components/schemas/ErrorMessage' + createdAt: + $ref: '#/components/schemas/CreatedAt' + createdBy: + $ref: '#/components/schemas/User' + updatedAt: + $ref: '#/components/schemas/UpdatedAt' + updatedBy: + $ref: '#/components/schemas/User' + + WebhookLog: + type: object + properties: + id: + $ref: '#/components/schemas/ID' + webhookID: + $ref: '#/components/schemas/ID' + url: + $ref: '#/components/schemas/URL' + status: + $ref: '#/components/schemas/Status' + ErrorMessage: + $ref: '#/components/schemas/ErrorMessage' + createdAt: + $ref: '#/components/schemas/CreatedAt' + createdBy: + $ref: '#/components/schemas/User' + updatedAt: + $ref: '#/components/schemas/UpdatedAt' + updatedBy: + $ref: '#/components/schemas/User' + requestHeaders: + type: string + description: "" + requestData: + type: string + description: "" + responseHeaders: + type: string + description: "" + responseBody: + type: string + description: "" + + Enabled: + type: boolean + description: if webhook enabled + URL: + type: string + description: + SslVerifyEnabled: + type: boolean + description: + Description: + type: string + description: + Secret: + type: string + description: + Triggers: + type: array + items: + type: string + enum: [ cluster_*, application_*, cluster_created, cluster_deployed, cluster_build, cluster_freed, cluster_deleted, application_created, application_deleted ] + description: "conditions to trigger this webhook" + CreatedAt: + type: string + description: + UpdatedAt: + type: string + description: + ErrorMessage: + type: string + description: + User: + type: object + properties: + id: + type: integer + name: + type: string + email: + type: string \ No newline at end of file diff --git a/pkg/event/dao/dao.go b/pkg/event/dao/dao.go new file mode 100644 index 00000000..a346f7b5 --- /dev/null +++ b/pkg/event/dao/dao.go @@ -0,0 +1,93 @@ +package dao + +import ( + "context" + + "gorm.io/gorm" + "gorm.io/gorm/clause" + + herrors "g.hz.netease.com/horizon/core/errors" + "g.hz.netease.com/horizon/pkg/event/models" +) + +type DAO interface { + CreateEvent(ctx context.Context, event *models.Event) (*models.Event, error) + ListEvents(ctx context.Context, offset, limit int) ([]*models.Event, error) + ListEventsByRange(ctx context.Context, start, end uint) ([]*models.Event, error) + CreateOrUpdateCursor(ctx context.Context, + eventIndex *models.EventCursor) (*models.EventCursor, error) + GetCursor(ctx context.Context) (*models.EventCursor, error) + GetEvent(ctx context.Context, id uint) (*models.Event, error) +} + +type dao struct{ db *gorm.DB } + +// NewDAO returns an instance of the default DAO +func NewDAO(db *gorm.DB) DAO { + return &dao{db: db} +} + +func (d *dao) CreateEvent(ctx context.Context, event *models.Event) (*models.Event, error) { + if result := d.db.WithContext(ctx).Create(event); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.EventInDB, result.Error.Error()) + } + return event, nil +} + +func (d *dao) ListEvents(ctx context.Context, offset, limit int) ([]*models.Event, error) { + var events []*models.Event + if result := d.db.WithContext(ctx).Order("id asc").Offset(offset).Limit(limit).Find(&events); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.EventInDB, result.Error.Error()) + } + return events, nil +} + +func (d *dao) ListEventsByRange(ctx context.Context, start, end uint) ([]*models.Event, error) { + var events []*models.Event + if result := d.db.WithContext(ctx).Order("id asc").Where("id >= ?", start). + Where("id <= ?", end).Find(&events); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.EventInDB, result.Error.Error()) + } + return events, nil +} + +func (d *dao) GetEvent(ctx context.Context, id uint) (*models.Event, error) { + var event *models.Event + if result := d.db.WithContext(ctx).Where("id = ?", id).Find(&event); result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return nil, herrors.NewErrNotFound(herrors.EventInDB, + result.Error.Error()) + } + return nil, herrors.NewErrInsertFailed(herrors.EventInDB, result.Error.Error()) + } + return event, nil +} + +func (d *dao) CreateOrUpdateCursor(ctx context.Context, + eventCursor *models.EventCursor) (*models.EventCursor, error) { + if result := d.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{ + { + Name: "id", + }, + }, + DoUpdates: clause.AssignmentColumns([]string{"position"}), + }).Create(eventCursor); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.EventInDB, result.Error.Error()) + } + return eventCursor, nil +} + +func (d *dao) GetCursor(ctx context.Context) (*models.EventCursor, error) { + var eventIndex models.EventCursor + if result := d.db.First(&eventIndex); result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return nil, herrors.NewErrNotFound(herrors.EventCursorInDB, + result.Error.Error()) + } + return nil, herrors.NewErrInsertFailed(herrors.EventCursorInDB, result.Error.Error()) + } + return &eventIndex, nil +} + +// todo: must add gc diff --git a/pkg/event/manager/manager.go b/pkg/event/manager/manager.go new file mode 100644 index 00000000..0f5a5404 --- /dev/null +++ b/pkg/event/manager/manager.go @@ -0,0 +1,76 @@ +package manager + +import ( + "context" + + "gorm.io/gorm" + + herrors "g.hz.netease.com/horizon/core/errors" + "g.hz.netease.com/horizon/pkg/event/dao" + "g.hz.netease.com/horizon/pkg/event/models" + "g.hz.netease.com/horizon/pkg/util/wlog" +) + +type Manager interface { + CreateEvent(ctx context.Context, event *models.Event) (*models.Event, error) + ListEvents(ctx context.Context, offset, limit int) ([]*models.Event, error) + ListEventsByRange(ctx context.Context, start, end uint) ([]*models.Event, error) + CreateOrUpdateCursor(ctx context.Context, + eventIndex *models.EventCursor) (*models.EventCursor, error) + GetCursor(ctx context.Context) (*models.EventCursor, error) + GetEvent(ctx context.Context, id uint) (*models.Event, error) +} + +type manager struct { + dao dao.DAO +} + +func New(db *gorm.DB) Manager { + return &manager{ + dao: dao.NewDAO(db), + } +} + +func (m *manager) CreateEvent(ctx context.Context, + event *models.Event) (*models.Event, error) { + const op = "event manager: create event" + defer wlog.Start(ctx, op).StopPrint() + + e, err := m.dao.CreateEvent(ctx, event) + if err != nil { + return nil, herrors.NewErrCreateFailed(herrors.EventInDB, err.Error()) + } + + return e, nil +} + +func (m *manager) CreateOrUpdateCursor(ctx context.Context, + eventCursor *models.EventCursor) (*models.EventCursor, error) { + const op = "event manager: create or update cursor" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.CreateOrUpdateCursor(ctx, eventCursor) +} + +func (m *manager) GetCursor(ctx context.Context) (*models.EventCursor, error) { + const op = "event manager: get cursor" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.GetCursor(ctx) +} + +func (m *manager) ListEvents(ctx context.Context, offset, limit int) ([]*models.Event, error) { + const op = "event manager: list events" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListEvents(ctx, offset, limit) +} + +func (m *manager) ListEventsByRange(ctx context.Context, start, end uint) ([]*models.Event, error) { + const op = "event manager: list events by range" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListEventsByRange(ctx, start, end) +} + +func (m *manager) GetEvent(ctx context.Context, id uint) (*models.Event, error) { + const op = "event manager: get event" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.GetEvent(ctx, id) +} diff --git a/pkg/event/manager/manager_test.go b/pkg/event/manager/manager_test.go new file mode 100644 index 00000000..2bd6831f --- /dev/null +++ b/pkg/event/manager/manager_test.go @@ -0,0 +1,66 @@ +package manager + +import ( + "context" + "testing" + + "g.hz.netease.com/horizon/core/common" + "g.hz.netease.com/horizon/lib/orm" + userauth "g.hz.netease.com/horizon/pkg/authentication/user" + eventmodels "g.hz.netease.com/horizon/pkg/event/models" + "github.com/stretchr/testify/assert" +) + +var ( + ctx context.Context + m Manager +) + +func createCtx() { + db, _ := orm.NewSqliteDB("file::memory:?cache=shared") + if err := db.AutoMigrate(&eventmodels.Event{}, + &eventmodels.EventCursor{}); err != nil { + panic(err) + } + if err := db.AutoMigrate(); err != nil { + panic(err) + } + ctx = context.Background() + // nolint + ctx = common.WithContext(ctx, &userauth.DefaultInfo{ + Name: "Jerry", + ID: 1, + Admin: true, + }) + m = New(db) +} + +func Test(t *testing.T) { + createCtx() + e := &eventmodels.Event{ + EventSummary: eventmodels.EventSummary{ + ResourceType: eventmodels.Cluster, + ResourceID: 1, + Action: eventmodels.Created, + }, + } + e, err := m.CreateEvent(ctx, e) + assert.Nil(t, err) + + e2, err := m.GetEvent(ctx, e.ID) + assert.Nil(t, err) + assert.Equal(t, e.ResourceID, e2.ResourceID) + + ec, err := m.CreateOrUpdateCursor(ctx, &eventmodels.EventCursor{ + Position: 1, + }) + assert.Nil(t, err) + + ec.Position = 2 + _, err = m.CreateOrUpdateCursor(ctx, ec) + assert.Nil(t, err) + + ec2, err := m.GetCursor(ctx) + assert.Nil(t, err) + assert.Equal(t, ec2.Position, ec2.Position) +} diff --git a/pkg/event/models/event.go b/pkg/event/models/event.go new file mode 100644 index 00000000..ff7eb846 --- /dev/null +++ b/pkg/event/models/event.go @@ -0,0 +1,53 @@ +package models + +import ( + "time" + + "g.hz.netease.com/horizon/core/common" +) + +type EventAction string +type EventResourceType string + +const ( + // resource type + AnyResource EventResourceType = "*" + Group EventResourceType = common.ResourceGroup + Application EventResourceType = common.ResourceApplication + Cluster EventResourceType = common.ResourceCluster + + // common actions + AnyAction EventAction = "*" + Created EventAction = "created" + Deleted EventAction = "deleted" + + // cluster actions + Transferred EventAction = "transferred" + + // cluster actions + Builded EventAction = "builded" + Deployed EventAction = "deployed" + Rollbacked EventAction = "rollbacked" + Freed EventAction = "freed" +) + +type EventSummary struct { + ResourceType EventResourceType + ResourceID uint + Action EventAction +} + +type Event struct { + EventSummary + ID uint + ReqID string + CreatedAt time.Time + CreatedBy uint +} + +type EventCursor struct { + ID uint + Position uint + CreatedAt time.Time + UpdatedAt time.Time +} diff --git a/pkg/eventhandler/eventhandler.go b/pkg/eventhandler/eventhandler.go new file mode 100644 index 00000000..a7becec1 --- /dev/null +++ b/pkg/eventhandler/eventhandler.go @@ -0,0 +1,172 @@ +package eventhandler + +import ( + "context" + "time" + + herrors "g.hz.netease.com/horizon/core/errors" + perror "g.hz.netease.com/horizon/pkg/errors" + eventmanager "g.hz.netease.com/horizon/pkg/event/manager" + "g.hz.netease.com/horizon/pkg/event/models" + "g.hz.netease.com/horizon/pkg/param/managerparam" + "g.hz.netease.com/horizon/pkg/util/log" + webhookmanager "g.hz.netease.com/horizon/pkg/webhook/manager" +) + +type Service interface { + RegisterEventHandler(name string, eh EventHandler) error + StopAndWait() + Start() +} + +type cursor struct { + ID uint + Position uint +} + +type eventHandlerService struct { + ctx context.Context + eventHandlers map[string]EventHandler + cursor *cursor + resume bool + quit chan bool + + eventMgr eventmanager.Manager + webhookMgr webhookmanager.Manager +} + +func NewService(ctx context.Context, manager *managerparam.Manager) Service { + return &eventHandlerService{ + ctx: ctx, + eventHandlers: map[string]EventHandler{}, + resume: true, + + eventMgr: manager.EventManager, + webhookMgr: manager.WebhookManager, + } +} + +// EventHandler can be registered to process events +type EventHandler interface { + Process(ctx context.Context, event []*models.Event, resume bool) error +} + +func (e *eventHandlerService) RegisterEventHandler(name string, eh EventHandler) error { + if _, ok := e.eventHandlers[name]; ok { + return perror.Wrapf(herrors.ErrEventHandlerAlreadyExist, "%s already exist", name) + } + e.eventHandlers[name] = eh + return nil +} + +func (e *eventHandlerService) StopAndWait() { + // 1. notify handlers to stop + e.quit <- true + // 2. wait for stop + <-e.quit + log.Info(e.ctx, "stop event handler queue") +} + +func (e *eventHandlerService) Start() { + go func() { + // 1. get cursor + for e.cursor == nil { + eventCursor, err := e.eventMgr.GetCursor(e.ctx) + if err != nil { + if _, ok := perror.Cause(err).(*herrors.HorizonErrNotFound); ok { + log.Infof(e.ctx, "index does not exist, start process directly") + e.cursor = &cursor{} + break + } else { + log.Errorf(e.ctx, "failed to get event cursor, error: %+v", err) + time.Sleep(time.Second * 3) + continue + } + } + e.cursor = &cursor{ + ID: eventCursor.ID, + Position: eventCursor.Position, + } + } + + // 2. process event + limit := uint(5) + saveCursorTicker := time.NewTicker(time.Second * 10) + waitInterval := time.Second * 3 + L: + for { + select { + case <-e.quit: + log.Infof(e.ctx, "save cursor(%d) and stop event handlers", e.cursor.Position) + e.saveCursor() + close(e.quit) + break L + case <-saveCursorTicker.C: + e.saveCursor() + default: + var ( + events []*models.Event + err error + ) + + if !e.resume { + events, err = e.eventMgr.ListEvents(e.ctx, int(e.cursor.Position), int(limit)) + if err != nil { + log.Errorf(e.ctx, "failed to list event by offset: %d, limit: %d", + e.cursor.Position, limit) + time.Sleep(waitInterval) + continue + } + if len(events) == 0 { + time.Sleep(waitInterval) + continue + } + } else { + // resume: continue to process the events that are halfway before restart + lastProcessingCursor, err := e.getLastProcessingCursor() + if err != nil { + log.Error(e.ctx, "failed to get last processing cursor") + time.Sleep(waitInterval) + continue + } + if lastProcessingCursor <= e.cursor.Position { + e.resume = false + continue + } + events, err = e.eventMgr.ListEventsByRange(e.ctx, e.cursor.Position, lastProcessingCursor) + if err != nil { + log.Errorf(e.ctx, "failed to list event by limit %d offset %d", e.cursor.Position, limit) + time.Sleep(waitInterval) + continue + } + } + for name, eh := range e.eventHandlers { + if err := eh.Process(e.ctx, events, e.resume); err != nil { + log.Errorf(e.ctx, "Failed to process event by handler %s, error: %s", + name, err.Error()) + continue + } + e.cursor.Position = events[len(events)-1].ID + } + if e.resume { + e.resume = false + } + } + } + }() +} + +// getLastProcessingCursor get the last processing event cursor to resume +func (e *eventHandlerService) getLastProcessingCursor() (uint, error) { + return e.webhookMgr.GetMaxEventIDOfLog(e.ctx) +} + +func (e *eventHandlerService) saveCursor() { + if _, err := e.eventMgr.CreateOrUpdateCursor(e.ctx, &models.EventCursor{ + ID: e.cursor.ID, + Position: e.cursor.Position, + }); err != nil { + log.Errorf(e.ctx, "failed to save cursor(%d), error: %+v", + e.cursor.Position, err) + } +} diff --git a/pkg/eventhandler/wlgenerator/wlgenerator.go b/pkg/eventhandler/wlgenerator/wlgenerator.go new file mode 100644 index 00000000..ca5e612f --- /dev/null +++ b/pkg/eventhandler/wlgenerator/wlgenerator.go @@ -0,0 +1,302 @@ +package wlgenerator + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/go-yaml/yaml" + + "g.hz.netease.com/horizon/core/common" + webhookctl "g.hz.netease.com/horizon/core/controller/webhook" + applicationmanager "g.hz.netease.com/horizon/pkg/application/manager" + applicationmodels "g.hz.netease.com/horizon/pkg/application/models" + clustermanager "g.hz.netease.com/horizon/pkg/cluster/manager" + clustermodels "g.hz.netease.com/horizon/pkg/cluster/models" + eventmanager "g.hz.netease.com/horizon/pkg/event/manager" + "g.hz.netease.com/horizon/pkg/event/models" + groupmanager "g.hz.netease.com/horizon/pkg/group/manager" + "g.hz.netease.com/horizon/pkg/param/managerparam" + usermanager "g.hz.netease.com/horizon/pkg/user/manager" + "g.hz.netease.com/horizon/pkg/util/log" + webhookmanager "g.hz.netease.com/horizon/pkg/webhook/manager" + webhookmodels "g.hz.netease.com/horizon/pkg/webhook/models" +) + +const ( + WebhookSecretHeader = "X-Horizon-Webhook-Secret" + WebhookContentTypeHeader = "Content-Type" + WebhookContentType = "application/json;charset=utf-8" +) + +type MessageContent struct { + ID uint `json:"id,omitempty"` + EventID uint `json:"eventID,omitempty"` + WebhookID uint `json:"webhookID,omitempty"` + ResourceType string `json:"resourceType,omitempty"` + Application *ApplicationInfo `json:"application,omitempty"` + Cluster *ClusterInfo `json:"cluster,omitempty"` + Action string `json:"action,omitempty"` + User *common.User `json:"user,omitempty"` +} + +type ResourceCommonInfo struct { + ID uint `json:"id"` + Name string `json:"name"` +} + +type ApplicationInfo struct { + ResourceCommonInfo + Priority string `json:"priority,omitempty"` +} + +type ClusterInfo struct { + ResourceCommonInfo + ApplicationName string `json:"applicationName,omitempty"` + Env string `json:"env,omitempty"` +} + +type WebhookLogGenerator struct { + webhookMgr webhookmanager.Manager + eventMgr eventmanager.Manager + groupMgr groupmanager.Manager + applicationMgr applicationmanager.Manager + clusterMgr clustermanager.Manager + userMgr usermanager.Manager +} + +func NewWebhookLogGenerator(manager *managerparam.Manager) *WebhookLogGenerator { + return &WebhookLogGenerator{ + webhookMgr: manager.WebhookManager, + eventMgr: manager.EventManager, + groupMgr: manager.GroupManager, + applicationMgr: manager.ApplicationManager, + clusterMgr: manager.ClusterMgr, + userMgr: manager.UserManager, + } +} + +func (w *WebhookLogGenerator) Process(ctx context.Context, events []*models.Event, + resume bool) error { + listSystemResources := func() map[string][]uint { + return map[string][]uint{ + string(models.Group): {0}, + } + } + + type messageDependency struct { + webhook *webhookmodels.Webhook + event *models.Event + application *applicationmodels.Application + cluster *clustermodels.Cluster + } + + listAssociatedResourcesOfApp := func(id uint) (*applicationmodels.Application, map[string][]uint) { + resources := listSystemResources() + app, err := w.applicationMgr.GetByID(ctx, id) + if err != nil { + log.Warningf(ctx, "application %d is not exist", id) + return nil, resources + } + resources[string(models.Application)] = []uint{app.ID} + + group, err := w.groupMgr.GetByID(ctx, app.GroupID) + if err != nil { + log.Warningf(ctx, "application %d is not exist", id) + return app, resources + } + groupIDs := groupmanager.FormatIDsFromTraversalIDs(group.TraversalIDs) + resources[string(models.Group)] = append(resources[string(models.Group)], groupIDs...) + return app, resources + } + + listAssociatedResourcesOfCluster := func(id uint) (*clustermodels.Cluster, + *applicationmodels.Application, map[string][]uint) { + cluster, err := w.clusterMgr.GetByID(ctx, id) + if err != nil { + log.Warningf(ctx, "cluster %d is not exist", + cluster.ID) + return nil, nil, nil + } + app, resources := listAssociatedResourcesOfApp(cluster.ApplicationID) + if resources == nil { + resources = map[string][]uint{} + } + resources[string(models.Cluster)] = []uint{cluster.ID} + return cluster, app, resources + } + + listAssociatedResources := func(e *models.Event) (*messageDependency, map[string][]uint) { + var ( + resources map[string][]uint + cluster *clustermodels.Cluster + application *applicationmodels.Application + dep = &messageDependency{} + ) + + switch e.ResourceType { + case models.Application: + application, resources = listAssociatedResourcesOfApp(e.ResourceID) + dep.application = application + case models.Cluster: + cluster, application, resources = listAssociatedResourcesOfCluster(e.ResourceID) + dep.application = application + dep.cluster = cluster + default: + log.Infof(ctx, "resource type %s is unsupported", + e.ResourceType) + } + return dep, resources + } + + makeHeaders := func(secret string) (string, error) { + header := http.Header{} + header.Add(WebhookSecretHeader, secret) + header.Add(WebhookContentTypeHeader, WebhookContentType) + headerByte, err := yaml.Marshal(header) + if err != nil { + return "", err + } + return string(headerByte), nil + } + + makeBody := func(dep *messageDependency) (string, error) { + user, err := w.userMgr.GetUserByID(ctx, dep.event.CreatedBy) + if err != nil { + return "", err + } + + message := MessageContent{ + EventID: dep.event.ID, + WebhookID: dep.webhook.ID, + ResourceType: string(dep.event.ResourceType), + Action: string(dep.event.Action), + User: common.ToUser(user), + } + + if dep.application != nil { + message.Application = &ApplicationInfo{ + ResourceCommonInfo: ResourceCommonInfo{ + ID: dep.application.ID, + Name: dep.application.Name, + }, + Priority: string(dep.application.Priority), + } + } + + if dep.cluster != nil { + message.Cluster = &ClusterInfo{ + ResourceCommonInfo: ResourceCommonInfo{ + ID: dep.application.ID, + Name: dep.application.Name, + }, + ApplicationName: dep.application.Name, + Env: dep.cluster.EnvironmentName, + } + } + + reqBody, err := json.Marshal(message) + if err != nil { + log.Errorf(ctx, fmt.Sprintf("failed to marshal message, error: %+v", err)) + return "", err + } + return string(reqBody), nil + } + + var ( + webhookLogs []*webhookmodels.WebhookLog + conditionsToCreate = map[uint]map[uint]messageDependency{} + conditionsToQuery = map[uint][]uint{} + ) + + for _, event := range events { + // 1. get associated resources according to event resource type + dependency, resources := listAssociatedResources(event) + if resources == nil { + continue + } + + // 2. list webhooks of all associated resources + webhooks, _, err := w.webhookMgr.ListWebhookOfResources(ctx, resources, nil) + if err != nil { + log.Errorf(ctx, "failed to list webhooks by condition %v, error: %s", resources, err.Error()) + continue + } + + // 3. assemble webhook list of all events, prepare to create + for _, webhook := range webhooks { + // 3.1 if event does not match webhook trigger, skip + ok, err := webhookctl.CheckIfEventMatch(webhook, event) + if err != nil { + log.Errorf(ctx, "failed to match triggers %s, error: %+v", webhook.Triggers, err) + continue + } else if !ok { + continue + } + // 3.2 add webhook to the list + if _, ok := conditionsToCreate[event.ID]; !ok { + conditionsToCreate[event.ID] = map[uint]messageDependency{} + } + conditionsToCreate[event.ID][webhook.ID] = messageDependency{ + webhook: webhook, + event: event, + application: dependency.application, + cluster: dependency.cluster, + } + conditionsToQuery[event.ID] = append(conditionsToQuery[event.ID], webhook.ID) + } + } + // 4. remove duplicate webhook log when resume + if resume { + if len(conditionsToQuery) == 0 { + return nil + } + existedWebhookLogs, err := w.webhookMgr.ListWebhookLogsByMap(ctx, conditionsToQuery) + if err != nil { + log.Errorf(ctx, "failed to list webhook logs by map, error: %+v", err) + } else { + for _, wl := range existedWebhookLogs { + delete(conditionsToCreate[wl.EventID], wl.WebhookID) + if len(conditionsToCreate[wl.EventID]) == 0 { + delete(conditionsToCreate, wl.EventID) + } + } + } + } + + // 5. assemble webhook logs to create + if len(conditionsToCreate) == 0 { + return nil + } + for _, dependencyMap := range conditionsToCreate { + for _, dependency := range dependencyMap { + headers, err := makeHeaders(dependency.webhook.Secret) + if err != nil { + log.Errorf(ctx, fmt.Sprintf("failed to make headers, error: %+v", err)) + continue + } + + body, err := makeBody(&dependency) + if err != nil { + log.Errorf(ctx, fmt.Sprintf("failed to make headers, error: %+v", err)) + continue + } + + webhookLogs = append(webhookLogs, &webhookmodels.WebhookLog{ + EventID: dependency.event.ID, + WebhookID: dependency.webhook.ID, + URL: dependency.webhook.URL, + RequestHeaders: headers, + RequestData: body, + Status: webhookmodels.StatusWaiting, + }) + } + } + + // 6. batch create webhook logs + if _, err := w.webhookMgr.CreateWebhookLogs(ctx, webhookLogs); err != nil { + log.Errorf(ctx, "failed to create webhooks, error: %s", err.Error()) + } + return nil +} diff --git a/pkg/member/service/service.go b/pkg/member/service/service.go index 63deda06..a96e2a9b 100644 --- a/pkg/member/service/service.go +++ b/pkg/member/service/service.go @@ -24,6 +24,7 @@ import ( usermanager "g.hz.netease.com/horizon/pkg/user/manager" usermodels "g.hz.netease.com/horizon/pkg/user/models" "g.hz.netease.com/horizon/pkg/util/log" + webhookmanager "g.hz.netease.com/horizon/pkg/webhook/manager" ) var ( @@ -64,6 +65,7 @@ type service struct { roleService roleservice.Service oauthManager oauthmanager.Manager userManager usermanager.Manager + webhookManager webhookmanager.Manager } func NewService(roleService roleservice.Service, oauthManager oauthmanager.Manager, @@ -79,6 +81,7 @@ func NewService(roleService roleservice.Service, oauthManager oauthmanager.Manag roleService: roleService, oauthManager: oauthManager, userManager: manager.UserManager, + webhookManager: manager.WebhookManager, } } @@ -189,6 +192,23 @@ func (s *service) listPipelinerunMember(ctx context.Context, pipelinerunID uint) pipeline.ClusterID) } +func (s *service) listWebhookMember(ctx context.Context, id uint) ([]models.Member, error) { + if id == 0 { + return nil, nil + } + webhook, err := s.webhookManager.GetWebhook(ctx, id) + if err != nil { + return nil, err + } + switch webhook.ResourceType { + case common.ResourceGroup, common.ResourceApplication, common.ResourceCluster: + return s.ListMember(ctx, webhook.ResourceType, + webhook.ResourceID) + default: + return nil, nil + } +} + func (s *service) GetMemberOfResource(ctx context.Context, resourceType string, resourceIDStr string) (*models.Member, error) { var currentUser userauth.User @@ -308,6 +328,8 @@ func (s *service) ListMember(ctx context.Context, resourceType string, resourceI allMembers, err = s.listReleaseMembers(ctx, resourceID) case common.ResourcePipelinerun: allMembers, err = s.listPipelinerunMember(ctx, resourceID) + case common.ResourceWebhook: + allMembers, err = s.listWebhookMember(ctx, resourceID) default: err = errors.New("unsupported resourceType") } diff --git a/pkg/param/managerparam/managerparam.go b/pkg/param/managerparam/managerparam.go index 4e8f9206..779d5189 100644 --- a/pkg/param/managerparam/managerparam.go +++ b/pkg/param/managerparam/managerparam.go @@ -1,12 +1,15 @@ package managerparam import ( + "gorm.io/gorm" + accesstokenmanager "g.hz.netease.com/horizon/pkg/accesstoken/manager" applicationmanager "g.hz.netease.com/horizon/pkg/application/manager" applicationregionmanager "g.hz.netease.com/horizon/pkg/applicationregion/manager" clustermanager "g.hz.netease.com/horizon/pkg/cluster/manager" envmanager "g.hz.netease.com/horizon/pkg/environment/manager" environmentregionmanager "g.hz.netease.com/horizon/pkg/environmentregion/manager" + eventManager "g.hz.netease.com/horizon/pkg/event/manager" groupmanager "g.hz.netease.com/horizon/pkg/group/manager" idpmanager "g.hz.netease.com/horizon/pkg/idp/manager" membermanager "g.hz.netease.com/horizon/pkg/member" @@ -21,7 +24,7 @@ import ( trtmanager "g.hz.netease.com/horizon/pkg/templateschematag/manager" usermanager "g.hz.netease.com/horizon/pkg/user/manager" linkmanager "g.hz.netease.com/horizon/pkg/userlink/manager" - "gorm.io/gorm" + webhookManager "g.hz.netease.com/horizon/pkg/webhook/manager" ) type Manager struct { @@ -46,6 +49,8 @@ type Manager struct { RegistryManager registrymanager.Manager IdpManager idpmanager.Manager AccessTokenManager accesstokenmanager.Manager + WebhookManager webhookManager.Manager + EventManager eventManager.Manager } func InitManager(db *gorm.DB) *Manager { @@ -71,5 +76,7 @@ func InitManager(db *gorm.DB) *Manager { RegistryManager: registrymanager.New(db), IdpManager: idpmanager.NewManager(db), AccessTokenManager: accesstokenmanager.New(db), + WebhookManager: webhookManager.New(db), + EventManager: eventManager.New(db), } } diff --git a/pkg/util/common/common.go b/pkg/util/common/common.go new file mode 100644 index 00000000..ebd57e31 --- /dev/null +++ b/pkg/util/common/common.go @@ -0,0 +1,13 @@ +package common + +func StringPtr(str string) *string { + return &str +} + +func IntPtr(i int) *int { + return &i +} + +func BoolPtr(b bool) *bool { + return &b +} diff --git a/pkg/webhook/dao/dao.go b/pkg/webhook/dao/dao.go new file mode 100644 index 00000000..3b53248e --- /dev/null +++ b/pkg/webhook/dao/dao.go @@ -0,0 +1,265 @@ +package dao + +import ( + "context" + "fmt" + + "gorm.io/gorm" + + "g.hz.netease.com/horizon/core/common" + herrors "g.hz.netease.com/horizon/core/errors" + "g.hz.netease.com/horizon/lib/q" + perror "g.hz.netease.com/horizon/pkg/errors" + "g.hz.netease.com/horizon/pkg/webhook/models" +) + +type DAO interface { + CreateWebhook(ctx context.Context, webhook *models.Webhook) (*models.Webhook, error) + GetWebhook(ctx context.Context, id uint) (*models.Webhook, error) + ListWebhookOfResources(ctx context.Context, + resources map[string][]uint, query *q.Query) ([]*models.Webhook, int64, error) + ListWebhooks(ctx context.Context) ([]*models.Webhook, error) + UpdateWebhook(ctx context.Context, id uint, w *models.Webhook) (*models.Webhook, error) + DeleteWebhook(ctx context.Context, id uint) error + CreateWebhookLog(ctx context.Context, wl *models.WebhookLog) (*models.WebhookLog, error) + CreateWebhookLogs(ctx context.Context, wls []*models.WebhookLog) ([]*models.WebhookLog, error) + ListWebhookLogs(ctx context.Context, wID uint, query *q.Query) ([]*models.WebhookLog, int64, error) + ListWebhookLogsByStatus(ctx context.Context, wID uint, + status string) ([]*models.WebhookLog, error) + ListWebhookLogsByMap(ctx context.Context, + webhookEventMap map[uint][]uint) ([]*models.WebhookLog, error) + UpdateWebhookLog(ctx context.Context, wl *models.WebhookLog) (*models.WebhookLog, error) + GetWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) + GetWebhookLogByEventID(ctx context.Context, webhookID, eventID uint) (*models.WebhookLog, error) + GetMaxEventIDOfLog(ctx context.Context) (uint, error) +} + +type dao struct{ db *gorm.DB } + +// NewDAO returns an instance of the default DAO +func NewDAO(db *gorm.DB) DAO { + return &dao{db: db} +} + +func (d *dao) CreateWebhook(ctx context.Context, webhook *models.Webhook) (*models.Webhook, error) { + if result := d.db.WithContext(ctx).Create(webhook); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.WebhookInDB, result.Error.Error()) + } + return webhook, nil +} + +func (d *dao) GetWebhook(ctx context.Context, id uint) (*models.Webhook, error) { + var w models.Webhook + if result := d.db.WithContext(ctx).First(&w, id); result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return nil, herrors.NewErrNotFound(herrors.WebhookInDB, result.Error.Error()) + } + return nil, herrors.NewErrGetFailed(herrors.WebhookInDB, result.Error.Error()) + } + return &w, nil +} + +func (d *dao) ListWebhooks(ctx context.Context) ([]*models.Webhook, error) { + var ws []*models.Webhook + if result := d.db.WithContext(ctx).Find(&ws); result.Error != nil { + return nil, herrors.NewErrGetFailed(herrors.WebhookInDB, result.Error.Error()) + } + return ws, nil +} + +func (d *dao) ListWebhookOfResources(ctx context.Context, + resources map[string][]uint, query *q.Query) ([]*models.Webhook, int64, error) { + var ws []*models.Webhook + var ( + condition *gorm.DB + pageSize = common.DefaultPageSize + pageNumber = common.DefaultPageNumber + limit int + offset int + count int64 + ) + // assemble condition + for resourceType, resourceIDs := range resources { + subCondition := d.db.Where("resource_type = ?", resourceType). + Where("resource_id in ?", resourceIDs) + if condition != nil { + condition.Or(subCondition) + } else { + condition = subCondition + } + } + + if query != nil { + if query.PageSize > 1 { + pageSize = query.PageSize + } + if query.PageNumber > 0 { + pageNumber = query.PageNumber + } + } + limit = pageSize + offset = (pageNumber - 1) * pageSize + + if result := d.db.WithContext(ctx).Where(condition).Limit(limit). + Offset(offset).Find(&ws).Offset(-1).Count(&count); result.Error != nil { + return nil, count, herrors.NewErrGetFailed(herrors.WebhookInDB, result.Error.Error()) + } + return ws, count, nil +} + +func (d *dao) UpdateWebhook(ctx context.Context, id uint, + w *models.Webhook) (*models.Webhook, error) { + if result := d.db.WithContext(ctx).Where("id = ?", id). + Select("enabled", "url", "enable_ssl_verify", "description", "secret", "triggers"). + Updates(w); result.Error != nil { + return nil, herrors.NewErrUpdateFailed(herrors.WebhookInDB, result.Error.Error()) + } + return w, nil +} + +func (d *dao) DeleteWebhook(ctx context.Context, id uint) error { + deleteFunc := func(tx *gorm.DB) error { + if result := d.db.WithContext(ctx).Where("webhook_id = ?", id). + Delete(&models.WebhookLog{}); result.Error != nil { + return herrors.NewErrDeleteFailed(herrors.WebhookInDB, result.Error.Error()) + } + + if result := d.db.WithContext(ctx).Delete(&models.Webhook{}, id); result.Error != nil { + return herrors.NewErrDeleteFailed(herrors.WebhookInDB, result.Error.Error()) + } + + return nil + } + return d.db.WithContext(ctx).Transaction(deleteFunc) +} + +func (d *dao) CreateWebhookLog(ctx context.Context, + wl *models.WebhookLog) (*models.WebhookLog, error) { + d.db.WithContext(ctx).Commit().Callback() + if result := d.db.WithContext(ctx).Create(wl); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return wl, nil +} + +func (d *dao) CreateWebhookLogs(ctx context.Context, + wls []*models.WebhookLog) ([]*models.WebhookLog, error) { + d.db.WithContext(ctx).Commit().Callback() + if result := d.db.WithContext(ctx).Create(wls); result.Error != nil { + return nil, herrors.NewErrInsertFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return wls, nil +} + +func (d *dao) ListWebhookLogs(ctx context.Context, wID uint, + query *q.Query) ([]*models.WebhookLog, int64, error) { + var ( + ws []*models.WebhookLog + pageSize = common.DefaultPageSize + pageNumber = common.DefaultPageNumber + limit int + offset int + count int64 + ) + + if query != nil { + if query.PageSize > 1 { + pageSize = query.PageSize + } + if query.PageNumber > 0 { + pageNumber = query.PageNumber + } + } + limit = pageSize + offset = (pageNumber - 1) * pageSize + + if result := d.db.WithContext(ctx).Order("created_at desc").Limit(limit). + Offset(offset).Where("webhook_id = ?", wID). + Find(&ws).Limit(-1).Count(&count); result.Error != nil { + return nil, count, herrors.NewErrGetFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return ws, count, nil +} + +func (d *dao) ListWebhookLogsByMap(ctx context.Context, + webhookEventMap map[uint][]uint) ([]*models.WebhookLog, error) { + var ( + ws []*models.WebhookLog + condition *gorm.DB + ) + if len(webhookEventMap) == 0 { + return nil, nil + } + // assemble condition + for webhookID, eventIDs := range webhookEventMap { + subCondition := d.db.Where("webhook_id = ?", webhookID). + Where("event_id in ?", eventIDs) + if condition != nil { + condition.Or(subCondition) + } else { + condition = subCondition + } + } + if result := d.db.WithContext(ctx).Where(condition). + Find(&ws); result.Error != nil { + return nil, herrors.NewErrGetFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return ws, nil +} + +func (d *dao) ListWebhookLogsByStatus(ctx context.Context, wID uint, + status string) ([]*models.WebhookLog, error) { + var ws []*models.WebhookLog + if result := d.db.WithContext(ctx).Where("webhook_id = ?", wID).Where("status = ?", status). + Find(&ws); result.Error != nil { + return nil, herrors.NewErrGetFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return ws, nil +} + +func (d *dao) UpdateWebhookLog(ctx context.Context, wl *models.WebhookLog) (*models.WebhookLog, error) { + if result := d.db.WithContext(ctx).Where("id = ?", wl.ID). + Select("status", "response_headers", "response_body", + "status", "error_message"). + Updates(wl); result.Error != nil { + return nil, herrors.NewErrUpdateFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return wl, nil +} + +func (d *dao) GetWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) { + var wl models.WebhookLog + if result := d.db.WithContext(ctx).Where("id = ?", id).First(&wl); result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return nil, perror.Wrap(herrors.NewErrNotFound(herrors.WebhookLogInDB, result.Error.Error()), + fmt.Sprintf("failed to find webhook log by id: %d", id)) + } + return nil, herrors.NewErrGetFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return &wl, nil +} + +func (d *dao) GetWebhookLogByEventID(ctx context.Context, webhookID, eventID uint) (*models.WebhookLog, error) { + var wl models.WebhookLog + if result := d.db.WithContext(ctx). + Where("webhook_id = ?", webhookID). + Where("event_id = ?", eventID). + First(&wl); result.Error != nil { + if result.Error == gorm.ErrRecordNotFound { + return nil, perror.Wrap(herrors.NewErrNotFound(herrors.WebhookLogInDB, result.Error.Error()), + fmt.Sprintf("failed to find webhook log by webhook id: %d, event id: %d", + webhookID, eventID)) + } + return nil, herrors.NewErrGetFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return &wl, nil +} + +func (d *dao) GetMaxEventIDOfLog(ctx context.Context) (uint, error) { + var maxID uint + if result := d.db.WithContext(ctx).Model(&models.WebhookLog{}).Select("ifnull(max(event_id),0)"). + Scan(&maxID); result.Error != nil { + return maxID, herrors.NewErrGetFailed(herrors.WebhookLogInDB, result.Error.Error()) + } + return maxID, nil +} diff --git a/pkg/webhook/manager/manager.go b/pkg/webhook/manager/manager.go new file mode 100644 index 00000000..c15c3a69 --- /dev/null +++ b/pkg/webhook/manager/manager.go @@ -0,0 +1,163 @@ +package manager + +import ( + "context" + + "gorm.io/gorm" + + "g.hz.netease.com/horizon/lib/q" + "g.hz.netease.com/horizon/pkg/util/wlog" + "g.hz.netease.com/horizon/pkg/webhook/dao" + models "g.hz.netease.com/horizon/pkg/webhook/models" +) + +type Manager interface { + CreateWebhook(ctx context.Context, webhook *models.Webhook) (*models.Webhook, error) + GetWebhook(ctx context.Context, id uint) (*models.Webhook, error) + ListWebhookOfResources(ctx context.Context, + resources map[string][]uint, query *q.Query) ([]*models.Webhook, int64, error) + ListWebhooks(ctx context.Context) ([]*models.Webhook, error) + UpdateWebhook(ctx context.Context, id uint, w *models.Webhook) (*models.Webhook, error) + DeleteWebhook(ctx context.Context, id uint) error + CreateWebhookLog(ctx context.Context, wl *models.WebhookLog) (*models.WebhookLog, error) + CreateWebhookLogs(ctx context.Context, wls []*models.WebhookLog) ([]*models.WebhookLog, error) + ListWebhookLogs(ctx context.Context, wID uint, query *q.Query) ([]*models.WebhookLog, int64, error) + ListWebhookLogsByMap(ctx context.Context, + webhookEventMap map[uint][]uint) ([]*models.WebhookLog, error) + ListWebhookLogsByStatus(ctx context.Context, wID uint, + status string) ([]*models.WebhookLog, error) + UpdateWebhookLog(ctx context.Context, wl *models.WebhookLog) (*models.WebhookLog, error) + GetWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) + RetryWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) + GetWebhookLogByEventID(ctx context.Context, webhookID, eventID uint) (*models.WebhookLog, error) + GetMaxEventIDOfLog(ctx context.Context) (uint, error) +} + +type manager struct { + dao dao.DAO +} + +func New(db *gorm.DB) Manager { + return &manager{ + dao: dao.NewDAO(db), + } +} + +func (m *manager) CreateWebhook(ctx context.Context, w *models.Webhook) (*models.Webhook, error) { + const op = "webhook manager: create webhook" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.CreateWebhook(ctx, w) +} + +func (m *manager) GetWebhook(ctx context.Context, id uint) (*models.Webhook, error) { + const op = "webhook manager: get webhook" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.GetWebhook(ctx, id) +} + +func (m *manager) ListWebhookOfResources(ctx context.Context, + resources map[string][]uint, query *q.Query) ([]*models.Webhook, int64, error) { + const op = "webhook manager: list webhook of resources" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListWebhookOfResources(ctx, resources, query) +} + +func (m *manager) ListWebhooks(ctx context.Context) ([]*models.Webhook, error) { + const op = "webhook manager: list webhooks" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListWebhooks(ctx) +} + +func (m *manager) UpdateWebhook(ctx context.Context, id uint, + w *models.Webhook) (*models.Webhook, error) { + const op = "webhook manager: update webhook" + defer wlog.Start(ctx, op).StopPrint() + + return m.dao.UpdateWebhook(ctx, id, w) +} + +func (m *manager) DeleteWebhook(ctx context.Context, id uint) error { + const op = "webhook manager: delete webhook" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.DeleteWebhook(ctx, id) +} + +func (m *manager) CreateWebhookLog(ctx context.Context, + wl *models.WebhookLog) (*models.WebhookLog, error) { + const op = "webhook manager: create webhook log" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.CreateWebhookLog(ctx, wl) +} + +func (m *manager) ListWebhookLogs(ctx context.Context, wID uint, + query *q.Query) ([]*models.WebhookLog, int64, error) { + const op = "webhook manager: list webhook logs" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListWebhookLogs(ctx, wID, query) +} + +func (m *manager) ListWebhookLogsByMap(ctx context.Context, + webhookEventMap map[uint][]uint) ([]*models.WebhookLog, error) { + const op = "webhook manager: list webhook logs by webhooks and events map" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListWebhookLogsByMap(ctx, webhookEventMap) +} + +func (m *manager) CreateWebhookLogs(ctx context.Context, wls []*models.WebhookLog) ([]*models.WebhookLog, error) { + const op = "webhook manager: create webhook logs" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.CreateWebhookLogs(ctx, wls) +} + +func (m *manager) ListWebhookLogsByStatus(ctx context.Context, wID uint, + status string) ([]*models.WebhookLog, error) { + const op = "webhook manager: list webhook logs by status" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.ListWebhookLogsByStatus(ctx, wID, status) +} + +func (m *manager) UpdateWebhookLog(ctx context.Context, wl *models.WebhookLog) (*models.WebhookLog, error) { + const op = "webhook manager: update webhook log" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.UpdateWebhookLog(ctx, wl) +} + +func (m *manager) GetWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) { + const op = "webhook manager: get webhook log" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.GetWebhookLog(ctx, id) +} + +func (m *manager) GetWebhookLogByEventID(ctx context.Context, webhookID, eventID uint) (*models.WebhookLog, error) { + const op = "webhook manager: get webhook log by event id" + defer wlog.Start(ctx, op).StopPrint() + return m.dao.GetWebhookLogByEventID(ctx, webhookID, eventID) +} + +func (m *manager) RetryWebhookLog(ctx context.Context, id uint) (*models.WebhookLog, error) { + const op = "webhook manager: retry webhook log" + defer wlog.Start(ctx, op).StopPrint() + + // 1. get webhook log + wl, err := m.dao.GetWebhookLog(ctx, id) + if err != nil { + return nil, err + } + + // 2. make a copy with waiting status + wlCopy := models.WebhookLog{ + WebhookID: wl.WebhookID, + EventID: wl.EventID, + URL: wl.URL, + RequestHeaders: wl.RequestHeaders, + RequestData: wl.RequestData, + ResponseHeaders: wl.ResponseHeaders, + ResponseBody: wl.ResponseBody, + Status: models.StatusWaiting, + } + return m.dao.CreateWebhookLog(ctx, &wlCopy) +} + +func (m *manager) GetMaxEventIDOfLog(ctx context.Context) (uint, error) { + return m.dao.GetMaxEventIDOfLog(ctx) +} diff --git a/pkg/webhook/models/webhook.go b/pkg/webhook/models/webhook.go new file mode 100644 index 00000000..5984c410 --- /dev/null +++ b/pkg/webhook/models/webhook.go @@ -0,0 +1,41 @@ +package models + +import "time" + +const ( + StatusWaiting = "waiting" + StatusSuccess = "success" + StatusFailed = "failed" +) + +type Webhook struct { + ID uint + Enabled bool + URL string + SslVerifyEnabled bool + Description string + Secret string + Triggers string + ResourceType string + ResourceID uint + CreatedAt time.Time + CreatedBy uint + UpdatedAt time.Time + UpdatedBy uint +} + +type WebhookLog struct { + ID uint + WebhookID uint + EventID uint + URL string + RequestHeaders string + RequestData string + ResponseHeaders string + ResponseBody string + Status string + ErrorMessage string + CreatedAt time.Time + CreatedBy uint + UpdatedAt time.Time +} diff --git a/pkg/webhook/service/service.go b/pkg/webhook/service/service.go new file mode 100644 index 00000000..9a705dee --- /dev/null +++ b/pkg/webhook/service/service.go @@ -0,0 +1,293 @@ +package service + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + eventmanager "g.hz.netease.com/horizon/pkg/event/manager" + "g.hz.netease.com/horizon/pkg/eventhandler/wlgenerator" + "g.hz.netease.com/horizon/pkg/param/managerparam" + usermanager "g.hz.netease.com/horizon/pkg/user/manager" + "g.hz.netease.com/horizon/pkg/util/log" + webhookmanager "g.hz.netease.com/horizon/pkg/webhook/manager" + "g.hz.netease.com/horizon/pkg/webhook/models" + webhookmodels "g.hz.netease.com/horizon/pkg/webhook/models" + "github.com/go-yaml/yaml" +) + +const ( + WebhookSecretHeader = "X-Horizon-Webhook-Secret" + WebhookContentTypeHeader = "Content-Type" + WebhookContentType = "application/json;charset=utf-8" +) + +type worker struct { + ctx context.Context + insecureClient http.Client + secureClient http.Client + quit chan bool + webhook *models.Webhook + + webhookManager webhookmanager.Manager + eventManager eventmanager.Manager + userManager usermanager.Manager +} + +type Service interface { + Start() + StopAndWait() +} + +type service struct { + ctx context.Context + quit chan bool + workers map[uint]*worker + insecureClient http.Client + secureClient http.Client + + webhookManager webhookmanager.Manager + eventManager eventmanager.Manager + userManager usermanager.Manager +} + +func NewService(ctx context.Context, manager *managerparam.Manager) Service { + return &service{ + ctx: ctx, + workers: make(map[uint]*worker), + insecureClient: http.Client{ + Timeout: time.Second * 30, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + }, + secureClient: http.Client{ + Timeout: time.Second * 30, + }, + webhookManager: manager.WebhookManager, + eventManager: manager.EventManager, + userManager: manager.UserManager, + } +} + +func (s *service) stopWorkersAndWait() { + wg := sync.WaitGroup{} + wg.Add(len(s.workers)) + for _, w := range s.workers { + go func(wk *worker) { + wk.Stop().Wait() + wg.Done() + }(w) + } + wg.Wait() +} + +func (s *service) reconcileWorkers() { + // 1. get latest webhook list + webhooks, err := s.webhookManager.ListWebhooks(s.ctx) + if err != nil { + log.Errorf(s.ctx, "failed to list webhooks, error: %s", err.Error()) + return + } + // 2. compare and reconcile workers + reconciled := map[uint]bool{} + for _, webhook := range webhooks { + id := webhook.ID + if worker, ok := s.workers[id]; ok { + // 2.1 update workers + if worker.webhook.UpdatedAt.Before(webhook.UpdatedAt) { + worker.webhook = webhook + } + } else { + // 2.2 create workers + s.workers[id] = newWebhookWorker(s.webhookManager, s.eventManager, + s.userManager, s.secureClient, s.insecureClient, webhook) + } + reconciled[id] = true + } + + // 2.2 stop deleted workers + for id := range s.workers { + if _, ok := reconciled[id]; ok { + continue + } + s.workers[id].Stop() + delete(s.workers, id) + } +} + +func (s *service) Start() { + t := time.NewTicker(time.Second * 5) + go func() { + s.reconcileWorkers() + L: + for { + select { + case <-s.quit: + s.stopWorkersAndWait() + close(s.quit) + break L + case <-t.C: + s.reconcileWorkers() + } + } + }() +} + +func (s *service) StopAndWait() { + s.quit <- true + <-s.quit + log.Info(s.ctx, "webhook service stopped") +} + +func newWebhookWorker(webhookMgr webhookmanager.Manager, + eventMgr eventmanager.Manager, userMgr usermanager.Manager, + secureClient, insecureClient http.Client, webhook *models.Webhook) *worker { + ww := &worker{ + ctx: context.Background(), + webhook: webhook, + quit: make(chan bool, 1), + secureClient: secureClient, + insecureClient: insecureClient, + + webhookManager: webhookMgr, + eventManager: eventMgr, + userManager: userMgr, + } + go ww.start() + return ww +} + +func (w *worker) start() { + ctx := w.ctx + waitInterval := time.Second * 2 +L: + for { + select { + case <-w.quit: + close(w.quit) + break L + default: + // todo: set limit and find a way to avoid this invoke + wls, err := w.webhookManager.ListWebhookLogsByStatus(ctx, w.webhook.ID, + webhookmodels.StatusWaiting) + if err != nil { + log.Errorf(ctx, "failed to list webhook logs of %d, error: %s", w.webhook.ID, err.Error()) + continue + } + if len(wls) == 0 { + time.Sleep(waitInterval) + continue + } + for _, wl := range wls { + saveResult := func() { + if wl.ErrorMessage != "" { + wl.Status = webhookmodels.StatusFailed + } else { + wl.Status = webhookmodels.StatusSuccess + } + _, err = w.webhookManager.UpdateWebhookLog(ctx, wl) + if err != nil { + log.Errorf(ctx, "failed to update webhook log %d, error: %s", wl.ID, err.Error()) + } + } + + // 1. make request and set body + reqBody, err := addWebhookLogID([]byte(wl.RequestData), wl.ID) + if err != nil { + wl.ErrorMessage = fmt.Sprintf("failed to add id, error: %+v", err) + log.Errorf(ctx, wl.ErrorMessage) + saveResult() + continue + } + req, err := http.NewRequest(http.MethodPost, wl.URL, + bytes.NewBuffer(reqBody)) + if err != nil { + wl.ErrorMessage = fmt.Sprintf("failed to new request, error: %+v", err) + log.Errorf(ctx, wl.ErrorMessage) + saveResult() + continue + } + + // 2. set headers + headers := http.Header{} + if err := yaml.Unmarshal([]byte(wl.RequestHeaders), &headers); err != nil { + wl.ErrorMessage = fmt.Sprintf("failed to unmarshal header, error: %+v", err) + log.Errorf(ctx, wl.ErrorMessage) + saveResult() + continue + } + req.Header = headers + + // 3. send request + cli := w.secureClient + if !w.webhook.SslVerifyEnabled { + cli = w.insecureClient + } + resp, err := cli.Do(req) + if err != nil { + wl.ErrorMessage = fmt.Sprintf("failed to send req, error: %+v", err) + log.Errorf(ctx, wl.ErrorMessage) + saveResult() + continue + } + + // 4. update response body + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + wl.ErrorMessage = fmt.Sprintf("failed to read response body, error: %+v", err) + log.Errorf(ctx, wl.ErrorMessage) + resp.Body.Close() + saveResult() + continue + } + wl.ResponseBody = string(respBody) + + // 5. update response headers + respHeader, err := yaml.Marshal(resp.Header) + if err != nil { + wl.ErrorMessage = fmt.Sprintf("failed to marshal, error: %+v", err) + log.Errorf(ctx, wl.ErrorMessage) + resp.Body.Close() + saveResult() + continue + } + if resp.StatusCode >= http.StatusBadRequest || resp.StatusCode < http.StatusOK { + wl.ErrorMessage = fmt.Sprintf("unexpected response code: %d", resp.StatusCode) + } + wl.ResponseHeaders = string(respHeader) + resp.Body.Close() + saveResult() + } + } + } +} + +func (w *worker) Stop() *worker { + w.quit <- true + return w +} + +func (w *worker) Wait() { + <-w.quit + log.Infof(w.ctx, "webhook worker %d stopped", w.webhook.ID) +} + +func addWebhookLogID(reqData []byte, id uint) ([]byte, error) { + var content wlgenerator.MessageContent + err := json.Unmarshal([]byte(reqData), &content) + if err != nil { + return nil, err + } + + content.ID = id + return json.Marshal(content) +} diff --git a/roles.yaml b/roles.yaml index c164a3b5..d6c762e3 100644 --- a/roles.yaml +++ b/roles.yaml @@ -15,6 +15,7 @@ Roles: - applications/transfer - applications/selectableregions - applications/subresourcetags + - applications/webhooks verbs: - "*" scopes: @@ -28,6 +29,7 @@ Roles: - groups/members - groups/groups - groups/transfer + - groups/webhooks verbs: - "*" scopes: @@ -78,6 +80,7 @@ Roles: - clusters/pause - clusters/resume - clusters/containers + - clusters/webhooks verbs: - "*" scopes: @@ -111,6 +114,19 @@ Roles: - "*" nonResourceURLs: - "*" + - apiGroups: + - core + resources: + - webhooks + - webhooks/logs + - webhooklogs + - webhooklogs/retry + verbs: + - "*" + scopes: + - "*" + nonResourceURLs: + - "*" - name: maintainer desc: maintainer为组/应用/集群的管理者,拥有除删除资源之外的其他权限,并且也可以进行成员管理 rules: