From 9b5a4c69a1011d45fac0577238b9407a5c569e69 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Sun, 3 Sep 2023 15:45:18 +0800 Subject: [PATCH 1/7] fix: log yaml --- manifests-endymx/deployment-auth-service.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manifests-endymx/deployment-auth-service.yaml b/manifests-endymx/deployment-auth-service.yaml index 1024b7c..30127c1 100644 --- a/manifests-endymx/deployment-auth-service.yaml +++ b/manifests-endymx/deployment-auth-service.yaml @@ -65,13 +65,13 @@ spec: memory: 200Mi volumeMounts: - mountPath: /fluent-bit/etc - name: gugotik-log-config + name: config - mountPath: /var/log/gugotik name: log-volume volumes: - name: config configMap: - name: fluentbit-config + name: gugotik-log-config - name: log-volume emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file From 66358c9206c9711faadc97fb7f452d9e884bf555 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Sun, 3 Sep 2023 16:39:45 +0800 Subject: [PATCH 2/7] fix: log yaml --- src/utils/logging/logging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/logging/logging.go b/src/utils/logging/logging.go index 9431a8d..99230f7 100644 --- a/src/utils/logging/logging.go +++ b/src/utils/logging/logging.go @@ -32,7 +32,7 @@ func init() { log.SetLevel(log.TraceLevel) } - filePath := path.Join("var", "log", "gugotik", "gugotik.log") + filePath := path.Join("/var", "log", "gugotik", "gugotik.log") dir := path.Dir(filePath) if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil { panic(err) From e7c67b235d728d533c236ac4a45bc482654cc5f0 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Sun, 3 Sep 2023 17:35:02 +0800 Subject: [PATCH 3/7] fix: log yaml --- config/logs/fluent-bit.conf | 2 +- .../deployment-comment-service.yaml | 24 +++++++++++++++++++ .../deployment-event-service.yaml | 24 +++++++++++++++++++ .../deployment-favorite-service.yaml | 24 +++++++++++++++++++ manifests-endymx/deployment-feed-service.yaml | 24 +++++++++++++++++++ manifests-endymx/deployment-http-service.yaml | 24 +++++++++++++++++++ .../deployment-message-service.yaml | 24 +++++++++++++++++++ .../deployment-msg-consumer-service.yaml | 24 +++++++++++++++++++ .../deployment-publish-service.yaml | 22 +++++++++++++++++ .../deployment-recommend-service.yaml | 24 +++++++++++++++++++ .../deployment-relation-service.yaml | 24 +++++++++++++++++++ manifests-endymx/deployment-user-service.yaml | 24 +++++++++++++++++++ .../deployment-video-processor-service.yaml | 22 +++++++++++++++++ 13 files changed, 285 insertions(+), 1 deletion(-) diff --git a/config/logs/fluent-bit.conf b/config/logs/fluent-bit.conf index 40239e4..b45b81a 100644 --- a/config/logs/fluent-bit.conf +++ b/config/logs/fluent-bit.conf @@ -29,4 +29,4 @@ Logstash_Format On Retry_Limit False Time_Key @timestamp - Logstash_Prefix gugotik \ No newline at end of file + Logstash_Prefix gugotik-log \ No newline at end of file diff --git a/manifests-endymx/deployment-comment-service.yaml b/manifests-endymx/deployment-comment-service.yaml index 66a2965..ac61113 100644 --- a/manifests-endymx/deployment-comment-service.yaml +++ b/manifests-endymx/deployment-comment-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37003 containerPort: 37003 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-event-service.yaml b/manifests-endymx/deployment-event-service.yaml index 22e1e0e..987f042 100644 --- a/manifests-endymx/deployment-event-service.yaml +++ b/manifests-endymx/deployment-event-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: metrics-37099 containerPort: 37099 @@ -47,4 +50,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-favorite-service.yaml b/manifests-endymx/deployment-favorite-service.yaml index ce4f3ac..daef9b3 100644 --- a/manifests-endymx/deployment-favorite-service.yaml +++ b/manifests-endymx/deployment-favorite-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37006 containerPort: 37006 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-feed-service.yaml b/manifests-endymx/deployment-feed-service.yaml index 391af56..957d02e 100644 --- a/manifests-endymx/deployment-feed-service.yaml +++ b/manifests-endymx/deployment-feed-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37004 containerPort: 37004 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-http-service.yaml b/manifests-endymx/deployment-http-service.yaml index 8492e04..9e1e98f 100644 --- a/manifests-endymx/deployment-http-service.yaml +++ b/manifests-endymx/deployment-http-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: http-37000 containerPort: 37000 @@ -47,4 +50,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-message-service.yaml b/manifests-endymx/deployment-message-service.yaml index b45f20d..0db0e26 100644 --- a/manifests-endymx/deployment-message-service.yaml +++ b/manifests-endymx/deployment-message-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37007 containerPort: 37007 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-msg-consumer-service.yaml b/manifests-endymx/deployment-msg-consumer-service.yaml index 1170259..5259a32 100644 --- a/manifests-endymx/deployment-msg-consumer-service.yaml +++ b/manifests-endymx/deployment-msg-consumer-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: metrics-37099 containerPort: 37099 @@ -47,4 +50,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-publish-service.yaml b/manifests-endymx/deployment-publish-service.yaml index 7a22a3d..781c3b6 100644 --- a/manifests-endymx/deployment-publish-service.yaml +++ b/manifests-endymx/deployment-publish-service.yaml @@ -25,6 +25,11 @@ spec: - name: volume persistentVolumeClaim: claimName: storage + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } imagePullSecrets: - name: regcred containers: @@ -57,4 +62,21 @@ spec: volumeMounts: - mountPath: /data/apps/gugotik-service-bundle/data name: volume + - mountPath: /var/log/gugotik + name: log-volume + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-recommend-service.yaml b/manifests-endymx/deployment-recommend-service.yaml index d21a0cf..30a4ebf 100644 --- a/manifests-endymx/deployment-recommend-service.yaml +++ b/manifests-endymx/deployment-recommend-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37009 containerPort: 37009 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-relation-service.yaml b/manifests-endymx/deployment-relation-service.yaml index 4cdc733..41534cb 100644 --- a/manifests-endymx/deployment-relation-service.yaml +++ b/manifests-endymx/deployment-relation-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37008 containerPort: 37008 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-user-service.yaml b/manifests-endymx/deployment-user-service.yaml index 74b7ff8..38fe0b9 100644 --- a/manifests-endymx/deployment-user-service.yaml +++ b/manifests-endymx/deployment-user-service.yaml @@ -36,6 +36,9 @@ spec: name: gugotik-env - secretRef: name: gugotik-secret + volumeMounts: + - mountPath: /var/log/gugotik + name: log-volume ports: - name: grpc-37002 containerPort: 37002 @@ -50,4 +53,25 @@ spec: requests: cpu: 100m memory: 128Mi + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume + volumes: + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-video-processor-service.yaml b/manifests-endymx/deployment-video-processor-service.yaml index b5f830f..fba356b 100644 --- a/manifests-endymx/deployment-video-processor-service.yaml +++ b/manifests-endymx/deployment-video-processor-service.yaml @@ -25,6 +25,11 @@ spec: - name: volume persistentVolumeClaim: claimName: storage + - name: config + configMap: + name: gugotik-log-config + - name: log-volume + emptyDir: { } imagePullSecrets: - name: regcred containers: @@ -54,4 +59,21 @@ spec: volumeMounts: - mountPath: /data/apps/gugotik-service-bundle/data name: volume + - mountPath: /var/log/gugotik + name: log-volume + - name: logger + image: fluent/fluent-bit:1.8.4 + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 20m + memory: 100Mi + limits: + cpu: 100m + memory: 200Mi + volumeMounts: + - mountPath: /fluent-bit/etc + name: config + - mountPath: /var/log/gugotik + name: log-volume terminationGracePeriodSeconds: 30 \ No newline at end of file From af7bdbea4a2ba9fd998dc1c9fa0331e299513f99 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Sun, 3 Sep 2023 18:28:53 +0800 Subject: [PATCH 4/7] feat: action --- src/constant/strings/service.go | 8 ++++++++ src/models/action.go | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 src/models/action.go diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 2bfd757..cd0f314 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -20,4 +20,12 @@ const ( MessageActionEvent = "message.common" MessageGptActionEvent = "message.gpt" + + // Action Id + FavoriteIdActionLog = 1 // 用户点赞相关操作 + + // Action Name + FavoriteNameActionLog = "favorite.action" // 用户点赞操作名称 + FavoriteUpActionSubLog = "up" + FavoriteDownActionSubLog = "down" ) diff --git a/src/models/action.go b/src/models/action.go new file mode 100644 index 0000000..58c17d6 --- /dev/null +++ b/src/models/action.go @@ -0,0 +1,19 @@ +package models + +import "gorm.io/gorm" + +type Action struct { + Type uint // 用户操作的行为类型,如:1表示点赞相关 + Name string // 用户操作的动作名称,如:FavoriteNameActionLog 表示点赞相关操作 + SubName string // 用户操作动作的子名称,如:FavoriteUpActionLog 表示给视频增加赞操作 + ServiceName string // 服务来源,添加服务的名称,如 FavoriteService + Attached string // 附带信息,当 Name - SubName 无法说明时,添加一个额外的信息 + ActorId uint32 // 操作者 Id + VideoId uint32 // 附属的视频 Id,没有填写为0 + AffectAction uint // 操作的类型,如:1. 自增/自减某个数据,2. 直接修改某个数据 + AffectedData string // 操作的数值是什么,如果是自增,填 1,如果是修改为某个数据,那么填这个数据的值 + EventId string // 如果这个操作是一个大操作的子类型,那么需要具有相同的 UUID + TraceId string // 这个操作的 TraceId + SpanId string // 这个操作的 SpanId + gorm.Model //数据库模型 +} From 31ec4b77883a2fa0f427a80ab262567c5d76b17f Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Sun, 3 Sep 2023 19:20:22 +0800 Subject: [PATCH 5/7] add message to es --- go.mod | 2 + go.sum | 7 ++ src/constant/config/.env.example | 3 +- src/constant/config/env.go | 1 + src/constant/strings/service.go | 1 + src/models/message.go | 10 +++ src/services/message/handler.go | 57 ++++++++++++++- src/services/msgconsumer/esexchange.go | 97 ++++++++++++++++++++++++++ src/services/msgconsumer/main.go | 34 +++++++-- src/storage/es/Elasticsearch.go | 35 ++++++++++ 10 files changed, 238 insertions(+), 9 deletions(-) create mode 100644 src/services/msgconsumer/esexchange.go create mode 100644 src/storage/es/Elasticsearch.go diff --git a/go.mod b/go.mod index 66cec14..3d80f7b 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,8 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/denis-tingaikin/go-header v0.4.3 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/elastic/go-elasticsearch v0.0.0 + github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/esimonov/ifshort v1.0.4 // indirect github.com/ettle/strcase v0.1.1 // indirect github.com/fatih/color v1.15.0 // indirect diff --git a/go.sum b/go.sum index fc702a4..2b87c39 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,7 @@ github.com/alexkohler/prealloc v1.0.0 h1:Hbq0/3fJPQhNkN0dR95AVrr6R7tou91y0uHG5pO github.com/alexkohler/prealloc v1.0.0/go.mod h1:VetnK3dIgFBBKmg0YnD9F9x6Icjd+9cvfHR56wJVlKE= github.com/alingse/asasalint v0.0.11 h1:SFwnQXJ49Kx/1GghOFz1XGqHYKp21Kq1nHad/0WQRnw= github.com/alingse/asasalint v0.0.11/go.mod h1:nCaoMhw7a9kSJObvQyVzNTPBDbNpdocqrSP7t/cW5+I= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= @@ -158,6 +159,10 @@ github.com/denis-tingaikin/go-header v0.4.3 h1:tEaZKAlqql6SKCY++utLmkPLd6K8IBM20 github.com/denis-tingaikin/go-header v0.4.3/go.mod h1:0wOCWuN71D5qIgE2nz9KrKmuYBAC2Mra5RassOIQ2/c= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -206,6 +211,7 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -647,6 +653,7 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= diff --git a/src/constant/config/.env.example b/src/constant/config/.env.example index 3ff3a6d..9f9d034 100644 --- a/src/constant/config/.env.example +++ b/src/constant/config/.env.example @@ -83,4 +83,5 @@ MAGIC_USER_ID= # Configure your clash proxy CHATGPT_PROXY= # Default anonymity user -ANONYMITY_USER= \ No newline at end of file +ANONYMITY_USER= +ElasticSearchUrl= diff --git a/src/constant/config/env.go b/src/constant/config/env.go index 029a60d..a04fa04 100644 --- a/src/constant/config/env.go +++ b/src/constant/config/env.go @@ -50,6 +50,7 @@ type envConfig struct { OtelState string `env:"TRACING_STATE" envDefault:"enable"` OtelSampler float64 `env:"TRACING_SAMPLER" envDefault:"0.01"` AnonymityUser string `env:"ANONYMITY_USER" envDefault:"114514"` + ElasticsearchUrl string `env:"ElasticSearchUrl"` } func init() { diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 2bfd757..61ff351 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -11,6 +11,7 @@ const ( VideoSummary = "video_summary" MessageCommon = "message_common" MessageGPT = "message_gpt" + MessageES = "message_es" // Routing key FavoriteActionEvent = "video.favorite.action" diff --git a/src/models/message.go b/src/models/message.go index 815b3cf..2c761f2 100644 --- a/src/models/message.go +++ b/src/models/message.go @@ -2,6 +2,7 @@ package models import ( "GuGoTik/src/storage/database" + "time" "gorm.io/gorm" ) @@ -18,6 +19,15 @@ type Message struct { gorm.Model } +// es 使用 +type EsMessage struct { + ToUserId uint32 `json:"toUserid"` + FromUserId uint32 `json:"fromUserId"` + ConversationId string `json:"conversationId"` + Content string `json:"content"` + CreateTime time.Time `json:"createTime"` +} + func init() { if err := database.Client.AutoMigrate(&Message{}); err != nil { panic(err) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 38f6c31..609f08b 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -12,19 +12,20 @@ import ( "GuGoTik/src/rpc/user" "GuGoTik/src/storage/database" "GuGoTik/src/storage/redis" + grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/ptr" "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" "fmt" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/robfig/cron/v3" - grpc2 "GuGoTik/src/utils/grpc" "time" "github.com/go-redis/redis_rate/v10" - "github.com/robfig/cron/v3" "gorm.io/gorm" "github.com/sirupsen/logrus" @@ -72,9 +73,56 @@ func (c MessageServiceImpl) New() { }, ) failOnError(err, "Failed to get exchange") + _, err = channel.QueueDeclare( + strings.MessageCommon, + true, false, false, false, + nil, + ) + failOnError(err, "Failed to define queue") - userRpcConn := grpc2.Connect(config.UserRpcServerName) + _, err = channel.QueueDeclare( + strings.MessageGPT, + true, false, false, false, + nil, + ) + + failOnError(err, "Failed to define queue") + _, err = channel.QueueDeclare( + strings.MessageES, + true, false, false, false, + nil, + ) + failOnError(err, "Failed to define queue") + + err = channel.QueueBind( + strings.MessageCommon, + "message.#", + strings.MessageExchange, + false, + nil, + ) + failOnError(err, "Failed to bind queue to exchange") + + err = channel.QueueBind( + strings.MessageES, + "message.#", + strings.MessageExchange, + false, + nil, + ) + failOnError(err, "Failed to bind queue to exchange") + + err = channel.QueueBind( + strings.MessageGPT, + strings.MessageGptActionEvent, + strings.MessageExchange, + false, + nil, + ) + failOnError(err, "Failed to bind queue to exchange") + + userRpcConn := grpc2.Connect(config.UserRpcServerName) userClient = user.NewUserServiceClient(userRpcConn) recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) @@ -346,6 +394,9 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context Content: Context, ConversationId: conversationId, } + message.Model = gorm.Model{ + CreatedAt: time.Now(), + } body, err := json.Marshal(message) if err != nil { diff --git a/src/services/msgconsumer/esexchange.go b/src/services/msgconsumer/esexchange.go new file mode 100644 index 0000000..024bf6d --- /dev/null +++ b/src/services/msgconsumer/esexchange.go @@ -0,0 +1,97 @@ +package main + +import ( + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/models" + "GuGoTik/src/storage/es" + "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" + "bytes" + "context" + "encoding/json" + + "github.com/elastic/go-elasticsearch/esapi" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +func esSaveMessage(channel *amqp.Channel) { + + msg, err := channel.Consume(strings.MessageES, "", + false, false, false, false, nil, + ) + failOnError(err, "Failed to Consume") + + var message models.Message + for body := range msg { + ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + ctx, span := tracing.Tracer.Start(ctx, "MessageSendService") + logger := logging.LogService("MessageSend").WithContext(ctx) + + if err := json.Unmarshal(body.Body, &message); err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "content": message.Content, + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + logging.SetSpanError(span, err) + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "content": message.Content, + "err": err, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue + } + + EsMessage := models.EsMessage{ + ToUserId: message.ToUserId, + FromUserId: message.FromUserId, + ConversationId: message.ConversationId, + Content: message.Content, + CreateTime: message.CreatedAt, + } + data, _ := json.Marshal(EsMessage) + + req := esapi.IndexRequest{ + Index: "message", + Refresh: "true", + Body: bytes.NewReader(data), + } + //返回值close + res, err := req.Do(ctx, es.EsClient) + + if err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "content": message.Content, + "err": err, + }).Errorf("Error when insert message to database.") + logging.SetSpanError(span, err) + + span.End() + continue + } + res.Body.Close() + + err = body.Ack(false) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the message...3") + logging.SetSpanError(span, err) + } + + } +} diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 50e95e4..d5572b2 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -13,13 +13,14 @@ import ( "context" "encoding/json" "errors" + "net/http" + url2 "net/url" + "sync" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/trace" - "net/http" - url2 "net/url" - "sync" ) var chatClient chat.ChatServiceClient @@ -113,6 +114,14 @@ func main() { true, false, false, false, nil, ) + + failOnError(err, "Failed to define queue") + _, err = channel.QueueDeclare( + strings.MessageES, + true, false, false, false, + nil, + ) + failOnError(err, "Failed to define queue") err = channel.QueueBind( @@ -124,6 +133,15 @@ func main() { ) failOnError(err, "Failed to bind queue to exchange") + err = channel.QueueBind( + strings.MessageES, + "message.#", + strings.MessageExchange, + false, + nil, + ) + failOnError(err, "Failed to bind queue to exchange") + err = channel.QueueBind( strings.MessageGPT, strings.MessageGptActionEvent, @@ -141,6 +159,10 @@ func main() { logger = logging.LogService("MessageGPTSend") logger.Infof(strings.MessageGptActionEvent + " is running now") + go esSaveMessage(channel) + logger = logging.LogService("esSaveMessage") + logger.Infof(strings.VideoPicker + " is running now") + defer CloseMQConn() wg := sync.WaitGroup{} @@ -198,7 +220,10 @@ func saveMessage(channel *amqp.Channel) { }).Debugf("Receive message event") //可能会重新插入数据 开启事务 晚点改 - result := database.Client.WithContext(context.Background()).Create(&pmessage) + //写入数据库 + + result := database.Client.WithContext(ctx).Create(&pmessage) + if result.Error != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, @@ -228,7 +253,6 @@ func saveMessage(channel *amqp.Channel) { "err": err, }).Errorf("Error when dealing with the message...3") logging.SetSpanError(span, err) - } } } diff --git a/src/storage/es/Elasticsearch.go b/src/storage/es/Elasticsearch.go new file mode 100644 index 0000000..1cd6210 --- /dev/null +++ b/src/storage/es/Elasticsearch.go @@ -0,0 +1,35 @@ +package es + +import ( + "GuGoTik/src/constant/config" + "log" + + es "github.com/elastic/go-elasticsearch/v7" +) + +var EsClient *es.Client + +func init() { + cfg := es.Config{ + Addresses: []string{ + config.EnvCfg.ElasticsearchUrl, + }, + } + var err error + EsClient, err = es.NewClient(cfg) + if err != nil { + log.Fatalf("elasticsearch.NewClient: %v", err) + } + + _, err = EsClient.Info() + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + + _, err = EsClient.API.Indices.Create("Message") + + if err != nil { + log.Fatalf("create index error: %s", err) + } + +} From 2dc502a60cb712b47f4d23d1e3df9739b07d2fc6 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Sun, 3 Sep 2023 22:33:09 +0800 Subject: [PATCH 6/7] fix: es config contract --- src/constant/config/.env.example | 3 ++- src/constant/config/env.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/constant/config/.env.example b/src/constant/config/.env.example index 9f9d034..07bd224 100644 --- a/src/constant/config/.env.example +++ b/src/constant/config/.env.example @@ -84,4 +84,5 @@ MAGIC_USER_ID= CHATGPT_PROXY= # Default anonymity user ANONYMITY_USER= -ElasticSearchUrl= +# Configure your Elastic Search Address +ES_ADDR= diff --git a/src/constant/config/env.go b/src/constant/config/env.go index a04fa04..2565731 100644 --- a/src/constant/config/env.go +++ b/src/constant/config/env.go @@ -50,7 +50,7 @@ type envConfig struct { OtelState string `env:"TRACING_STATE" envDefault:"enable"` OtelSampler float64 `env:"TRACING_SAMPLER" envDefault:"0.01"` AnonymityUser string `env:"ANONYMITY_USER" envDefault:"114514"` - ElasticsearchUrl string `env:"ElasticSearchUrl"` + ElasticsearchUrl string `env:"ES_ADDR"` } func init() { From 1179a05fa66fbfc09a4a11866586298e4f62dbdf Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Mon, 4 Sep 2023 01:59:30 +0800 Subject: [PATCH 7/7] feat(audit): add favorite and follow action to action table by mq --- src/constant/strings/service.go | 32 ++++++-- src/models/action.go | 12 ++- src/models/comment.go | 2 +- src/services/favorite/handler.go | 46 +++++++++++- src/services/favorite/main.go | 5 ++ src/services/msgconsumer/main.go | 121 +++++++++++++++++++++++++++++++ src/services/relation/handler.go | 76 +++++++++++++++++++ src/services/relation/main.go | 10 +++ src/utils/audit/publish.go | 71 ++++++++++++++++++ 9 files changed, 367 insertions(+), 8 deletions(-) create mode 100644 src/utils/audit/publish.go diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index cd0f314..330255b 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -1,18 +1,24 @@ package strings +// Exchange name const ( - // Exchange name VideoExchange = "video_exchange" EventExchange = "event" MessageExchange = "message_exchange" + AuditExchange = "audit_exchange" +) - // Queue name +// Queue name +const ( VideoPicker = "video_picker" VideoSummary = "video_summary" MessageCommon = "message_common" MessageGPT = "message_gpt" + AuditPicker = "audit_picker" +) - // Routing key +// Routing key +const ( FavoriteActionEvent = "video.favorite.action" VideoGetEvent = "video.get.action" VideoCommentEvent = "video.comment.action" @@ -20,12 +26,28 @@ const ( MessageActionEvent = "message.common" MessageGptActionEvent = "message.gpt" + AuditPublishEvent = "audit" +) - // Action Id +// Action Type +const ( FavoriteIdActionLog = 1 // 用户点赞相关操作 + FollowIdActionLog = 2 // 用户关注相关操作 +) - // Action Name +// Action Name +const ( FavoriteNameActionLog = "favorite.action" // 用户点赞操作名称 FavoriteUpActionSubLog = "up" FavoriteDownActionSubLog = "down" + + FollowNameActionLog = "follow.action" // 用户关注操作名称 + FollowUpActionSubLog = "up" + FollowDownActionSubLog = "down" +) + +// Action Service Name +const ( + FavoriteServiceName = "FavoriteService" + FollowServiceName = "FollowService" ) diff --git a/src/models/action.go b/src/models/action.go index 58c17d6..33e6d85 100644 --- a/src/models/action.go +++ b/src/models/action.go @@ -1,6 +1,9 @@ package models -import "gorm.io/gorm" +import ( + "GuGoTik/src/storage/database" + "gorm.io/gorm" +) type Action struct { Type uint // 用户操作的行为类型,如:1表示点赞相关 @@ -10,6 +13,7 @@ type Action struct { Attached string // 附带信息,当 Name - SubName 无法说明时,添加一个额外的信息 ActorId uint32 // 操作者 Id VideoId uint32 // 附属的视频 Id,没有填写为0 + AffectUserId uint32 // 操作的用户 Id,如:被关注的用户 Id AffectAction uint // 操作的类型,如:1. 自增/自减某个数据,2. 直接修改某个数据 AffectedData string // 操作的数值是什么,如果是自增,填 1,如果是修改为某个数据,那么填这个数据的值 EventId string // 如果这个操作是一个大操作的子类型,那么需要具有相同的 UUID @@ -17,3 +21,9 @@ type Action struct { SpanId string // 这个操作的 SpanId gorm.Model //数据库模型 } + +func init() { + if err := database.Client.AutoMigrate(&Action{}); err != nil { + panic(err) + } +} diff --git a/src/models/comment.go b/src/models/comment.go index b3b9262..2e929d2 100644 --- a/src/models/comment.go +++ b/src/models/comment.go @@ -6,7 +6,7 @@ import ( ) type Comment struct { - ID uint32 `gorm:"not null;primarykey;autoIncrement"` // 评论 ID + ID uint32 `gorm:"not null;primaryKey;autoIncrement"` // 评论 ID VideoId uint32 `json:"video_id" column:"video_id" gorm:"not null;index:comment_video"` // 视频 ID UserId uint32 `json:"user_id" column:"user_id" gorm:"not null"` // 用户 ID Content string `json:"content" column:"content"` // 评论内容 diff --git a/src/services/favorite/handler.go b/src/services/favorite/handler.go index 17d54f2..11e5420 100644 --- a/src/services/favorite/handler.go +++ b/src/services/favorite/handler.go @@ -9,13 +9,16 @@ import ( "GuGoTik/src/rpc/feed" "GuGoTik/src/rpc/user" redis2 "GuGoTik/src/storage/redis" + "GuGoTik/src/utils/audit" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" "fmt" + "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel/trace" "strconv" "sync" "time" @@ -215,8 +218,9 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo pipe.ZAdd(ctx, user_like_Id, redis.Z{Score: float64(time.Now().Unix()), Member: req.VideoId}) return nil }) + // Publish event to event_exchange and audit_exchange wg := sync.WaitGroup{} - wg.Add(1) + wg.Add(2) go func() { defer wg.Done() produceFavorite(ctx, models.RecommendEvent{ @@ -226,6 +230,23 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo Source: config.FavoriteRpcServerName, }) }() + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FavoriteIdActionLog, + Name: strings.FavoriteNameActionLog, + SubName: strings.FavoriteUpActionSubLog, + ServiceName: strings.FavoriteServiceName, + ActorId: req.ActorId, + VideoId: req.VideoId, + AffectAction: 1, + AffectedData: "1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() wg.Wait() if err == redis.Nil { err = nil @@ -255,6 +276,29 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo return nil }) + + // Publish event to event_exchange and audit_exchange + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FavoriteIdActionLog, + Name: strings.FavoriteNameActionLog, + SubName: strings.FavoriteDownActionSubLog, + ServiceName: strings.FavoriteServiceName, + ActorId: req.ActorId, + VideoId: req.VideoId, + AffectAction: 1, + AffectedData: "-1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() + wg.Wait() + if err == redis.Nil { err = nil } diff --git a/src/services/favorite/main.go b/src/services/favorite/main.go index c0a516b..c054428 100644 --- a/src/services/favorite/main.go +++ b/src/services/favorite/main.go @@ -5,6 +5,7 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/favorite" + "GuGoTik/src/utils/audit" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -77,6 +78,10 @@ func main() { log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err) } srv.New() + + // Initialize the audit_exchange + audit.DeclareAuditExchange(channel) + srvMetrics.InitializeMetrics(s) g := &run.Group{} diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 50e95e4..bdd3046 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -13,6 +13,7 @@ import ( "context" "encoding/json" "errors" + "fmt" amqp "github.com/rabbitmq/amqp091-go" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" @@ -101,6 +102,14 @@ func main() { ) failOnError(err, "Failed to get exchange") + err = channel.ExchangeDeclare( + strings.AuditExchange, + "direct", + true, false, false, false, + nil, + ) + failOnError(err, fmt.Sprintf("Failed to get %s exchange", strings.AuditExchange)) + _, err = channel.QueueDeclare( strings.MessageCommon, true, false, false, false, @@ -115,6 +124,13 @@ func main() { ) failOnError(err, "Failed to define queue") + _, err = channel.QueueDeclare( + strings.AuditPicker, + true, false, false, false, + nil, + ) + failOnError(err, fmt.Sprintf("Failed to define %s queue", strings.AuditPicker)) + err = channel.QueueBind( strings.MessageCommon, "message.#", @@ -133,6 +149,15 @@ func main() { ) failOnError(err, "Failed to bind queue to exchange") + err = channel.QueueBind( + strings.AuditPicker, + strings.AuditPublishEvent, + strings.AuditExchange, + false, + nil, + ) + failOnError(err, fmt.Sprintf("Failed to bind %s queue to %s exchange", strings.AuditPicker, strings.AuditExchange)) + go saveMessage(channel) logger := logging.LogService("MessageSend") logger.Infof(strings.MessageActionEvent + " is running now") @@ -141,6 +166,10 @@ func main() { logger = logging.LogService("MessageGPTSend") logger.Infof(strings.MessageGptActionEvent + " is running now") + go saveAuditAction(channel) + logger = logging.LogService("AuditPublish") + logger.Infof(strings.AuditPublishEvent + " is running now") + defer CloseMQConn() wg := sync.WaitGroup{} @@ -327,6 +356,98 @@ func chatWithGPT(channel *amqp.Channel) { } } +func saveAuditAction(channel *amqp.Channel) { + msg, err := channel.Consume( + strings.AuditPicker, + "", + false, false, false, false, + nil, + ) + failOnError(err, "Failed to Consume") + + var action models.Action + for body := range msg { + ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + + ctx, span := tracing.Tracer.Start(ctx, "AuditPublishService") + logger := logging.LogService("AuditPublish").WithContext(ctx) + + if err := json.Unmarshal(body.Body, &action); err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + logging.SetSpanError(span, err) + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "err": err, + "Type": action.Type, + "SubName": action.SubName, + "ServiceName": action.ServiceName, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue + } + + pAction := models.Action{ + Type: action.Type, + Name: action.Name, + SubName: action.SubName, + ServiceName: action.ServiceName, + Attached: action.Attached, + ActorId: action.ActorId, + VideoId: action.VideoId, + AffectAction: action.AffectAction, + AffectedData: action.AffectedData, + EventId: action.EventId, + TraceId: action.TraceId, + SpanId: action.SpanId, + } + logger.WithFields(logrus.Fields{ + "action": pAction, + }).Debugf("Recevie action event") + + result := database.Client.WithContext(ctx).Create(&pAction) + if result.Error != nil { + logger.WithFields( + logrus.Fields{ + "err": err, + "Type": action.Type, + "SubName": action.SubName, + "ServiceName": action.ServiceName, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "err": err, + "Type": action.Type, + "SubName": action.SubName, + "ServiceName": action.ServiceName, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue + } + err = body.Ack(false) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the action...") + logging.SetSpanError(span, err) + } + } +} + func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger *logrus.Entry, span *trace.Span) { if !requeue { // Nack the message err := d.Nack(false, false) diff --git a/src/services/relation/handler.go b/src/services/relation/handler.go index 7e7918d..683620f 100644 --- a/src/services/relation/handler.go +++ b/src/services/relation/handler.go @@ -10,12 +10,16 @@ import ( "GuGoTik/src/storage/cached" "GuGoTik/src/storage/database" redis2 "GuGoTik/src/storage/redis" + "GuGoTik/src/utils/audit" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" "context" "errors" "fmt" "github.com/go-redis/redis_rate/v10" + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" @@ -38,9 +42,33 @@ func actionRelationLimitKey(userId uint32) string { return fmt.Sprintf("%s-%d", actionRelationLimitKeyPrefix, userId) } +func exitOnError(err error) { + if err != nil { + panic(err) + } +} + +func CloseMQConn() { + if err := conn.Close(); err != nil { + panic(err) + } + + if err := channel.Close(); err != nil { + panic(err) + } +} + func (r RelationServiceImpl) New() { userRPCConn := grpc2.Connect(config.UserRpcServerName) userClient = user.NewUserServiceClient(userRPCConn) + + var err error + + conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) + exitOnError(err) + + channel, err = conn.Channel() + exitOnError(err) } func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.RelationActionRequest) (resp *relation.RelationActionResponse, err error) { @@ -213,6 +241,30 @@ func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.Relat StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, } + + // Publish event to event_exchange and audit_exchange + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FollowIdActionLog, + Name: strings.FollowNameActionLog, + SubName: strings.FollowUpActionSubLog, + ServiceName: strings.FollowServiceName, + ActorId: request.ActorId, + VideoId: 0, + AffectUserId: request.UserId, + AffectAction: 1, + AffectedData: "1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() + wg.Wait() + return } @@ -351,6 +403,30 @@ func (r RelationServiceImpl) Unfollow(ctx context.Context, request *relation.Rel StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, } + + // Publish event to event_exchange and audit_exchange + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FollowIdActionLog, + Name: strings.FollowNameActionLog, + SubName: strings.FollowDownActionSubLog, + ServiceName: strings.FollowServiceName, + ActorId: request.ActorId, + VideoId: 0, + AffectUserId: request.UserId, + AffectAction: 1, + AffectedData: "-1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() + wg.Wait() + return } diff --git a/src/services/relation/main.go b/src/services/relation/main.go index 0951e62..2fbf2d3 100644 --- a/src/services/relation/main.go +++ b/src/services/relation/main.go @@ -5,6 +5,7 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/relation" + "GuGoTik/src/utils/audit" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -12,6 +13,7 @@ import ( grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus/promhttp" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" @@ -23,6 +25,9 @@ import ( "syscall" ) +var conn = &amqp.Connection{} +var channel = &amqp.Channel{} + func main() { tp, err := tracing.SetTraceProvider(config.RelationRpcServerName) @@ -74,6 +79,11 @@ func main() { grpc_health_v1.RegisterHealthServer(s, health.NewServer()) srv.New() + + // Initialize the audit_exchange + audit.DeclareAuditExchange(channel) + defer CloseMQConn() + srvMetrics.InitializeMetrics(s) g := &run.Group{} diff --git a/src/utils/audit/publish.go b/src/utils/audit/publish.go new file mode 100644 index 0000000..2e8acd1 --- /dev/null +++ b/src/utils/audit/publish.go @@ -0,0 +1,71 @@ +package audit + +import ( + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + models2 "GuGoTik/src/models" + "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" + "context" + "encoding/json" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +func exitOnError(err error) { + if err != nil { + panic(err) + } +} + +func DeclareAuditExchange(channel *amqp.Channel) { + err := channel.ExchangeDeclare( + strings.AuditExchange, + "direct", + true, + false, + false, + false, + nil, + ) + exitOnError(err) +} + +func PublishAuditEvent(ctx context.Context, action *models2.Action, channel *amqp.Channel) { + ctx, span := tracing.Tracer.Start(ctx, "AuditEventPublisher") + defer span.End() + logging.SetSpanWithHostname(span) + logger := logging.LogService("AuditEventPublisher").WithContext(ctx) + + data, err := json.Marshal(action) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when marshal the action model") + logging.SetSpanError(span, err) + return + } + + headers := rabbitmq.InjectAMQPHeaders(ctx) + + err = channel.PublishWithContext(ctx, + strings.AuditExchange, + strings.AuditPublishEvent, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: data, + Headers: headers, + }, + ) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when publishing the action model") + logging.SetSpanError(span, err) + return + } + +}