Skip to content

Commit

Permalink
Merge pull request #249 from DrowYue/fix/webhook-cleaner
Browse files Browse the repository at this point in the history
fix: update webhook cleaner sql
  • Loading branch information
xuzhu-591 authored Aug 26, 2024
2 parents d24ec30 + 75ad7fb commit 6cdb919
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 2 deletions.
3 changes: 1 addition & 2 deletions pkg/jobs/clean/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,9 @@ func (c *Cleaner) webhookLogClean(ctx context.Context, current time.Time) {
Keywords: q.KeyWords{
common.StartID: c.webhookLogCursor,
common.Limit: c.Batch,
common.OrderBy: "l.id",
},
}
webhooklogs, _, err := c.mgr.WebhookMgr.ListWebhookLogs(ctx, query, nil)
webhooklogs, err := c.mgr.WebhookMgr.ListWebhookLogsForClean(ctx, query)
if err != nil {
log.Errorf(ctx, "failed to list webhooklogs: %v", err)
time.Sleep(5 * time.Second)
Expand Down
26 changes: 26 additions & 0 deletions pkg/webhook/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type DAO interface {
GetWebhookLogByEventID(ctx context.Context, webhookID, eventID uint) (*models.WebhookLog, error)
GetMaxEventIDOfLog(ctx context.Context) (uint, error)
DeleteWebhookLogs(ctx context.Context, id ...uint) (int64, error)
ListWebhookLogsForClean(ctx context.Context, query *q.Query) ([]*models.WebhookLogWithEventInfo, error)
}

type dao struct{ db *gorm.DB }
Expand Down Expand Up @@ -322,3 +323,28 @@ func (d *dao) GetMaxEventIDOfLog(ctx context.Context) (uint, error) {
}
return maxID, nil
}

func (d *dao) ListWebhookLogsForClean(ctx context.Context, query *q.Query) ([]*models.WebhookLogWithEventInfo, error) {
var logs []*models.WebhookLogWithEventInfo

statement := d.db.WithContext(ctx).Table("tb_webhook_log l").
Joins("left join tb_event e on l.event_id=e.id").
Select("l.id, l.updated_at, e.resource_type, e.resource_id, e.event_type")

if query != nil {
if v, ok := query.Keywords[common.StartID]; ok {
statement = statement.Where("l.id > ?", v)
}
if v, ok := query.Keywords[common.Limit]; ok {
if limit, ok := v.(int); ok {
statement = statement.Limit(limit)
}
}
}

if result := statement.Find(&logs); result.Error != nil {
return nil, herrors.NewErrInsertFailed(herrors.WebhookLogInDB, result.Error.Error())
}

return logs, nil
}
10 changes: 10 additions & 0 deletions pkg/webhook/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Manager interface {
GetWebhookLogByEventID(ctx context.Context, webhookID, eventID uint) (*models.WebhookLog, error)
GetMaxEventIDOfLog(ctx context.Context) (uint, error)
DeleteWebhookLogs(ctx context.Context, id ...uint) (int64, error)
ListWebhookLogsForClean(ctx context.Context, query *q.Query) ([]*models.WebhookLogWithEventInfo, error)
}

type manager struct {
Expand Down Expand Up @@ -182,3 +183,12 @@ func (m *manager) ResendWebhook(ctx context.Context, id uint) (*models.WebhookLo
func (m *manager) GetMaxEventIDOfLog(ctx context.Context) (uint, error) {
return m.dao.GetMaxEventIDOfLog(ctx)
}

func (m *manager) ListWebhookLogsForClean(
ctx context.Context,
query *q.Query,
) ([]*models.WebhookLogWithEventInfo, error) {
const op = "webhook manager: list webhook logs for clean"
defer wlog.Start(ctx, op).StopPrint()
return m.dao.ListWebhookLogsForClean(ctx, query)
}
184 changes: 184 additions & 0 deletions pkg/webhook/manager/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright © 2023 Horizoncd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package manager

import (
"context"
"github.com/horizoncd/horizon/core/common"
"github.com/horizoncd/horizon/lib/orm"
"github.com/horizoncd/horizon/lib/q"
eventmanageer "github.com/horizoncd/horizon/pkg/event/manager"
eventmodels "github.com/horizoncd/horizon/pkg/event/models"
webhookmodels "github.com/horizoncd/horizon/pkg/webhook/models"
"github.com/stretchr/testify/assert"
"os"
"testing"
)

var (
db, _ = orm.NewSqliteDB("")
ctx context.Context
mgr = New(db)
eventMgr = eventmanageer.New(db)
)

func Test(t *testing.T) {
clustersCreated := "clustersCreated"
webhook := &webhookmodels.Webhook{
Enabled: true,
URL: "https://horizon.org",
SSLVerifyEnabled: false,
Triggers: clustersCreated,
ResourceType: "clusters",
ResourceID: 0,
}

webhook, err := mgr.CreateWebhook(ctx, webhook)
assert.Nil(t, err)

retrieveWebhook, err := mgr.GetWebhook(ctx, webhook.ID)
assert.Nil(t, err)
assert.NotNil(t, retrieveWebhook)
assert.Equal(t, retrieveWebhook.ID, webhook.ID)

resources := map[string][]uint{}
resources[common.ResourceCluster] = []uint{0}
_, count, err := mgr.ListWebhookOfResources(ctx, resources, q.New(q.KeyWords{
common.Enabled: true,
}))
assert.Nil(t, err)
assert.Equal(t, int64(1), count)

retrieveWebhooks, err := mgr.ListWebhooks(ctx)
assert.Nil(t, err)
assert.Equal(t, 1, len(retrieveWebhooks))

webhook.URL = "https://horizon.com"
retrieveWebhook, err = mgr.UpdateWebhook(ctx, webhook.ID, webhook)
assert.Nil(t, err)
assert.Equal(t, retrieveWebhook.ID, webhook.ID)
assert.Equal(t, retrieveWebhook.URL, "https://horizon.com")

events := []*eventmodels.Event{
{
EventSummary: eventmodels.EventSummary{
ResourceType: common.ResourceCluster,
ResourceID: 1,
EventType: eventmodels.ClusterCreated,
},
ReqID: "xxx",
},
{
EventSummary: eventmodels.EventSummary{
ResourceType: common.ResourceCluster,
ResourceID: 2,
EventType: eventmodels.ClusterCreated,
},
ReqID: "xxx",
},
}
for _, e := range events {
_, err := eventMgr.CreateEvent(ctx, e)
assert.Nil(t, err)
}

webhookLogs := []*webhookmodels.WebhookLog{
{
WebhookID: 1,
EventID: 1,
URL: "http://example.com",
RequestHeaders: "Content-Type: application/json",
RequestData: `{"key": "value"}`,
ResponseHeaders: "Content-Type: application/json",
ResponseBody: `{"status": "ok"}`,
Status: webhookmodels.StatusWaiting,
ErrorMessage: "",
},
{
WebhookID: 2,
EventID: 2,
URL: "http://example.com",
RequestHeaders: "Content-Type: application/json",
RequestData: `{"key": "value"}`,
ResponseHeaders: "Content-Type: application/json",
ResponseBody: `{"status": "ok"}`,
Status: webhookmodels.StatusSuccess,
ErrorMessage: "",
},
}

for _, log := range webhookLogs {
_, err := mgr.CreateWebhookLog(ctx, log)
assert.Nil(t, err)
}

retrievedLog, err := mgr.GetWebhookLog(ctx, webhookLogs[0].ID)
assert.Nil(t, err)
assert.NotNil(t, retrievedLog)
assert.Equal(t, retrievedLog.ID, webhookLogs[0].ID)

retrievedLog.Status = webhookmodels.StatusSuccess
updatedLog, err := mgr.UpdateWebhookLog(ctx, retrievedLog)
assert.Nil(t, err)
assert.NotNil(t, updatedLog)
assert.Equal(t, updatedLog.Status, webhookmodels.StatusSuccess)

query := &q.Query{}
logs, _, err := mgr.ListWebhookLogs(ctx, query, nil)
assert.Nil(t, err)
assert.NotNil(t, logs)
assert.GreaterOrEqual(t, len(logs), 2)

query = &q.Query{
Keywords: q.KeyWords{
common.StartID: 0,
common.Limit: 10,
},
}
cleanLogs, err := mgr.ListWebhookLogsForClean(ctx, query)
assert.Nil(t, err)
assert.NotNil(t, cleanLogs)
assert.GreaterOrEqual(t, len(cleanLogs), 2)
for _, log := range cleanLogs {
assert.Contains(t, []uint{1, 2}, log.ID)
}

_, err = mgr.ResendWebhook(ctx, 1)
assert.Nil(t, err)

_, err = mgr.GetMaxEventIDOfLog(ctx)
assert.Nil(t, err)

retrievedLogs, err := mgr.GetWebhookLogByEventID(ctx, 1, 1)
assert.Nil(t, err)
assert.Equal(t, uint(1), retrievedLogs.ID)

for _, log := range webhookLogs {
_, err = mgr.DeleteWebhookLogs(ctx, log.ID)
assert.Nil(t, err)
}
}

func TestMain(m *testing.M) {
if err := db.AutoMigrate(&webhookmodels.WebhookLog{},
&webhookmodels.Webhook{}); err != nil {
panic(err)
}
if err := db.AutoMigrate(&eventmodels.Event{}); err != nil {
panic(err)
}
ctx = context.TODO()
os.Exit(m.Run())
}

0 comments on commit 6cdb919

Please sign in to comment.