diff --git a/.gitignore b/.gitignore index d529c4f..830f89f 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,10 @@ # Dependency directories (remove the comment below to include it) # vendor/ - /.idea \ No newline at end of file + /.idea +.idea +.scannerwork + +/.vscode +.nvim +.nvimlog \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/mercurius.iml b/.idea/mercurius.iml deleted file mode 100644 index 7ee078d..0000000 --- a/.idea/mercurius.iml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1dd..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/cmd/mercurius-client/publisher-client/mercurius-client.go b/cmd/mercurius-client/publisher-client/mercurius-client.go index 7211a9e..4c750ec 100644 --- a/cmd/mercurius-client/publisher-client/mercurius-client.go +++ b/cmd/mercurius-client/publisher-client/mercurius-client.go @@ -19,7 +19,8 @@ const CLIENT_NAME = "Sample Client" var logger = logger2.NewLogger() var messageCount = atomic.Uint64{} -var N = 100 +var N = 100 * 100 +var start time.Time func main() { c, err := client.NewClient(CLIENT_NAME, ADDR) @@ -33,20 +34,28 @@ func main() { wg.Add(N) for i := 0; i < N; i++ { go func(w *sync.WaitGroup) { - for j := 0; j < 100; j++ { - if err := c.Publish(TopicName, []byte(strconv.FormatUint(messageCount.Load(), 10)), context.Background()); err != nil { + for j := 0; j < 1; j++ { + x := messageCount.Add(1) + if err := c.Publish(TopicName, []byte(strconv.FormatUint(x, 10)), context.Background()); err != nil { logger.Error("Err", zap.Error(err)) } - fmt.Println(strconv.FormatUint(messageCount.Load(), 10)) - - messageCount.Add(1) + if x == 1 { + start = time.Now() + } + fmt.Println(x) + if x == 1000*1000 { + z := time.Since(start) + fmt.Println("Execution time: ", z) + } + fmt.Println(strconv.FormatUint(x, 10)) + //time.Sleep(time.Millisecond) } w.Done() }(&wg) + //time.Sleep(200 * time.Second) } wg.Wait() - time.Sleep(1 * time.Hour) } // package main diff --git a/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go b/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go index b75dcaf..6938ae0 100644 --- a/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go +++ b/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go @@ -18,9 +18,10 @@ const CLIENT_NAME = "Sample Client" const N = 100 * 100 * 100 var messageCount = atomic.Uint64{} -var start time.Time = time.Time{} +var start = time.Time{} var logger = k.NewLogger() -var ctx, cancel = context.WithCancel(context.Background()) +var ctx, _ = context.WithCancel(context.Background()) +var ch = make(chan struct{}) func main() { c, err := client.NewClient(CLIENT_NAME, ADDR) @@ -33,29 +34,22 @@ func main() { logger.Error("Err", zap.Error(err)) } }() - } - - timer := time.NewTimer(900 * time.Second) -ConsumerLoop: - for { - select { - case <-timer.C: - cancel() - break ConsumerLoop - } } + <-ch } func handler(e *proto.Event) error { - messageCount.Add(1) - if messageCount.Load() == 1 { + x := messageCount.Add(1) + if x == 1 { start = time.Now() } - if messageCount.Load() == N { + fmt.Println(x) + if x == N { z := time.Since(start) fmt.Println("Execution time: ", z) + ch <- struct{}{} } return nil } diff --git a/cmd/mercurius/mercurius.go b/cmd/mercurius/mercurius.go index 5f44717..7152126 100644 --- a/cmd/mercurius/mercurius.go +++ b/cmd/mercurius/mercurius.go @@ -4,11 +4,12 @@ import ( "net" "github.com/ispiroglu/mercurius/internal/logger" - "go.uber.org/zap" - sv "github.com/ispiroglu/mercurius/internal/server" "github.com/ispiroglu/mercurius/proto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" "google.golang.org/grpc" + "net/http" ) const ADDR = "0.0.0.0:9000" @@ -28,11 +29,10 @@ func main() { proto.RegisterMercuriusServer(grpcServer, server) - // go func() { - // time.Sleep(10 * time.Second) - // grpcServer.GracefulStop() - // }() - + go func() { + http.Handle("/metrics", promhttp.Handler()) + _ = http.ListenAndServe(":8081", nil) + }() if err := grpcServer.Serve(list); err != nil { log.Fatal("Failed to serve", zap.Error(err)) } diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..a8ef9b9 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,37 @@ +version: "3" +services: + +# tempo: +# image: grafana/tempo:latest +# command: [ "-config.file=/etc/tempo.yaml" ] +# volumes: +# - ./prometheus/tempo.yaml:/etc/tempo.yaml +# ports: +# - "3200:3200" +# - "4318:4318" +# loki: +# image: grafana/loki:latest +# ports: +# - "3100:3100" +# command: -config.file=/etc/loki/local-config.yaml + prometheus: + image: prom/prometheus + ports: + - 9090:9090 + command: + - --web.enable-remote-write-receiver + - --enable-feature=exemplar-storage + volumes: + - ./prometheus/prometheus.yml:/prometheus/prometheus.yml + + grafana: + image: grafana/grafana:latest + # volumes: + # - ./prometheus/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml + environment: + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + - GF_AUTH_DISABLE_LOGIN_FORM=true + - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor + ports: + - "3000:3000" \ No newline at end of file diff --git a/go.mod b/go.mod index cc9b943..d6c6eef 100644 --- a/go.mod +++ b/go.mod @@ -4,21 +4,30 @@ go 1.20 require ( github.com/google/uuid v1.3.0 + github.com/prometheus/client_golang v1.16.0 github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.24.0 google.golang.org/grpc v1.53.0 - google.golang.org/protobuf v1.29.0 + google.golang.org/protobuf v1.30.0 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/net v0.5.0 // indirect - golang.org/x/sys v0.4.0 // indirect - golang.org/x/text v0.6.0 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.7.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7b3655f..0d14f17 100644 --- a/go.sum +++ b/go.sum @@ -1,277 +1,40 @@ -cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= -cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM= -cloud.google.com/go/accessapproval v1.5.0 h1:/nTivgnV/n1CaAeo+ekGexTYUsKEU9jUVkoY5359+3Q= -cloud.google.com/go/accessapproval v1.5.0/go.mod h1:HFy3tuiGvMdcd/u+Cu5b9NkO1pEICJ46IR82PoUdplw= -cloud.google.com/go/accesscontextmanager v1.4.0 h1:CFhNhU7pcD11cuDkQdrE6PQJgv0EXNKNv06jIzbLlCU= -cloud.google.com/go/accesscontextmanager v1.4.0/go.mod h1:/Kjh7BBu/Gh83sv+K60vN9QE5NJcd80sU33vIe2IFPE= -cloud.google.com/go/aiplatform v1.27.0 h1:DBi3Jk9XjCJ4pkkLM4NqKgj3ozUL1wq4l+d3/jTGXAI= -cloud.google.com/go/aiplatform v1.27.0/go.mod h1:Bvxqtl40l0WImSb04d0hXFU7gDOiq9jQmorivIiWcKg= -cloud.google.com/go/analytics v0.12.0 h1:NKw6PpQi6V1O+KsjuTd+bhip9d0REYu4NevC45vtGp8= -cloud.google.com/go/analytics v0.12.0/go.mod h1:gkfj9h6XRf9+TS4bmuhPEShsh3hH8PAZzm/41OOhQd4= -cloud.google.com/go/apigateway v1.4.0 h1:IIoXKR7FKrEAQhMTz5hK2wiDz2WNFHS7eVr/L1lE/rM= -cloud.google.com/go/apigateway v1.4.0/go.mod h1:pHVY9MKGaH9PQ3pJ4YLzoj6U5FUDeDFBllIz7WmzJoc= -cloud.google.com/go/apigeeconnect v1.4.0 h1:AONoTYJviyv1vS4IkvWzq69gEVdvHx35wKXc+e6wjZQ= -cloud.google.com/go/apigeeconnect v1.4.0/go.mod h1:kV4NwOKqjvt2JYR0AoIWo2QGfoRtn/pkS3QlHp0Ni04= -cloud.google.com/go/appengine v1.5.0 h1:lmG+O5oaR9xNwaRBwE2XoMhwQHsHql5IoiGr1ptdDwU= -cloud.google.com/go/appengine v1.5.0/go.mod h1:TfasSozdkFI0zeoxW3PTBLiNqRmzraodCWatWI9Dmak= -cloud.google.com/go/area120 v0.6.0 h1:TCMhwWEWhCn8d44/Zs7UCICTWje9j3HuV6nVGMjdpYw= -cloud.google.com/go/area120 v0.6.0/go.mod h1:39yFJqWVgm0UZqWTOdqkLhjoC7uFfgXRC8g/ZegeAh0= -cloud.google.com/go/artifactregistry v1.9.0 h1:3d0LRAU1K6vfqCahhl9fx2oGHcq+s5gftdix4v8Ibrc= -cloud.google.com/go/artifactregistry v1.9.0/go.mod h1:2K2RqvA2CYvAeARHRkLDhMDJ3OXy26h3XW+3/Jh2uYc= -cloud.google.com/go/asset v1.10.0 h1:aCrlaLGJWTODJX4G56ZYzJefITKEWNfbjjtHSzWpxW0= -cloud.google.com/go/asset v1.10.0/go.mod h1:pLz7uokL80qKhzKr4xXGvBQXnzHn5evJAEAtZiIb0wY= -cloud.google.com/go/assuredworkloads v1.9.0 h1:hhIdCOowsT1GG5eMCIA0OwK6USRuYTou/1ZeNxCSRtA= -cloud.google.com/go/assuredworkloads v1.9.0/go.mod h1:kFuI1P78bplYtT77Tb1hi0FMxM0vVpRC7VVoJC3ZoT0= -cloud.google.com/go/automl v1.8.0 h1:BMioyXSbg7d7xLibn47cs0elW6RT780IUWr42W8rp2Q= -cloud.google.com/go/automl v1.8.0/go.mod h1:xWx7G/aPEe/NP+qzYXktoBSDfjO+vnKMGgsApGJJquM= -cloud.google.com/go/baremetalsolution v0.4.0 h1:g9KO6SkakcYPcc/XjAzeuUrEOXlYPnMpuiaywYaGrmQ= -cloud.google.com/go/baremetalsolution v0.4.0/go.mod h1:BymplhAadOO/eBa7KewQ0Ppg4A4Wplbn+PsFKRLo0uI= -cloud.google.com/go/batch v0.4.0 h1:1jvEBY55OH4Sd2FxEXQfxGExFWov1A/IaRe+Z5Z71Fw= -cloud.google.com/go/batch v0.4.0/go.mod h1:WZkHnP43R/QCGQsZ+0JyG4i79ranE2u8xvjq/9+STPE= -cloud.google.com/go/beyondcorp v0.3.0 h1:w+4kThysgl0JiKshi2MKDCg2NZgOyqOI0wq2eBZyrzA= -cloud.google.com/go/beyondcorp v0.3.0/go.mod h1:E5U5lcrcXMsCuoDNyGrpyTm/hn7ne941Jz2vmksAxW8= -cloud.google.com/go/bigquery v1.44.0 h1:Wi4dITi+cf9VYp4VH2T9O41w0kCW0uQTELq2Z6tukN0= -cloud.google.com/go/bigquery v1.44.0/go.mod h1:0Y33VqXTEsbamHJvJHdFmtqHvMIY28aK1+dFsvaChGc= -cloud.google.com/go/billing v1.7.0 h1:Xkii76HWELHwBtkQVZvqmSo9GTr0O+tIbRNnMcGdlg4= -cloud.google.com/go/billing v1.7.0/go.mod h1:q457N3Hbj9lYwwRbnlD7vUpyjq6u5U1RAOArInEiD5Y= -cloud.google.com/go/binaryauthorization v1.4.0 h1:pL70vXWn9TitQYXBWTK2abHl2JHLwkFRjYw6VflRqEA= -cloud.google.com/go/binaryauthorization v1.4.0/go.mod h1:tsSPQrBd77VLplV70GUhBf/Zm3FsKmgSqgm4UmiDItk= -cloud.google.com/go/certificatemanager v1.4.0 h1:tzbR4UHBbgsewMWUD93JHi8EBi/gHBoSAcY1/sThFGk= -cloud.google.com/go/certificatemanager v1.4.0/go.mod h1:vowpercVFyqs8ABSmrdV+GiFf2H/ch3KyudYQEMM590= -cloud.google.com/go/channel v1.9.0 h1:pNuUlZx0Jb0Ts9P312bmNMuH5IiFWIR4RUtLb70Ke5s= -cloud.google.com/go/channel v1.9.0/go.mod h1:jcu05W0my9Vx4mt3/rEHpfxc9eKi9XwsdDL8yBMbKUk= -cloud.google.com/go/cloudbuild v1.4.0 h1:TAAmCmAlOJ4uNBu6zwAjwhyl/7fLHHxIEazVhr3QBbQ= -cloud.google.com/go/cloudbuild v1.4.0/go.mod h1:5Qwa40LHiOXmz3386FrjrYM93rM/hdRr7b53sySrTqA= -cloud.google.com/go/clouddms v1.4.0 h1:UhzHIlgFfMr6luVYVNydw/pl9/U5kgtjCMJHnSvoVws= -cloud.google.com/go/clouddms v1.4.0/go.mod h1:Eh7sUGCC+aKry14O1NRljhjyrr0NFC0G2cjwX0cByRk= -cloud.google.com/go/cloudtasks v1.8.0 h1:faUiUgXjW8yVZ7XMnKHKm1WE4OldPBUWWfIRN/3z1dc= -cloud.google.com/go/cloudtasks v1.8.0/go.mod h1:gQXUIwCSOI4yPVK7DgTVFiiP0ZW/eQkydWzwVMdHxrI= -cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE= -cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63rR+SXhcpA= -cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= -cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= -cloud.google.com/go/contactcenterinsights v1.4.0 h1:tTQLI/ZvguUf9Hv+36BkG2+/PeC8Ol1q4pBW+tgCx0A= -cloud.google.com/go/contactcenterinsights v1.4.0/go.mod h1:L2YzkGbPsv+vMQMCADxJoT9YiTTnSEd6fEvCeHTYVck= -cloud.google.com/go/container v1.7.0 h1:nbEK/59GyDRKKlo1SqpohY1TK8LmJ2XNcvS9Gyom2A0= -cloud.google.com/go/container v1.7.0/go.mod h1:Dp5AHtmothHGX3DwwIHPgq45Y8KmNsgN3amoYfxVkLo= -cloud.google.com/go/containeranalysis v0.6.0 h1:2824iym832ljKdVpCBnpqm5K94YT/uHTVhNF+dRTXPI= -cloud.google.com/go/containeranalysis v0.6.0/go.mod h1:HEJoiEIu+lEXM+k7+qLCci0h33lX3ZqoYFdmPcoO7s4= -cloud.google.com/go/datacatalog v1.8.0 h1:6kZ4RIOW/uT7QWC5SfPfq/G8sYzr/v+UOmOAxy4Z1TE= -cloud.google.com/go/datacatalog v1.8.0/go.mod h1:KYuoVOv9BM8EYz/4eMFxrr4DUKhGIOXxZoKYF5wdISM= -cloud.google.com/go/dataflow v0.7.0 h1:CW3541Fm7KPTyZjJdnX6NtaGXYFn5XbFC5UcjgALKvU= -cloud.google.com/go/dataflow v0.7.0/go.mod h1:PX526vb4ijFMesO1o202EaUmouZKBpjHsTlCtB4parQ= -cloud.google.com/go/dataform v0.5.0 h1:vLwowLF2ZB5J5gqiZCzv076lDI/Rd7zYQQFu5XO1PSg= -cloud.google.com/go/dataform v0.5.0/go.mod h1:GFUYRe8IBa2hcomWplodVmUx/iTL0FrsauObOM3Ipr0= -cloud.google.com/go/datafusion v1.5.0 h1:j5m2hjWovTZDTQak4MJeXAR9yN7O+zMfULnjGw/OOLg= -cloud.google.com/go/datafusion v1.5.0/go.mod h1:Kz+l1FGHB0J+4XF2fud96WMmRiq/wj8N9u007vyXZ2w= -cloud.google.com/go/datalabeling v0.6.0 h1:dp8jOF21n/7jwgo/uuA0RN8hvLcKO4q6s/yvwevs2ZM= -cloud.google.com/go/datalabeling v0.6.0/go.mod h1:WqdISuk/+WIGeMkpw/1q7bK/tFEZxsrFJOJdY2bXvTQ= -cloud.google.com/go/dataplex v1.4.0 h1:cNxeA2DiWliQGi21kPRqnVeQ5xFhNoEjPRt1400Pm8Y= -cloud.google.com/go/dataplex v1.4.0/go.mod h1:X51GfLXEMVJ6UN47ESVqvlsRplbLhcsAt0kZCCKsU0A= -cloud.google.com/go/dataproc v1.8.0 h1:gVOqNmElfa6n/ccG/QDlfurMWwrK3ezvy2b2eDoCmS0= -cloud.google.com/go/dataproc v1.8.0/go.mod h1:5OW+zNAH0pMpw14JVrPONsxMQYMBqJuzORhIBfBn9uI= -cloud.google.com/go/dataqna v0.6.0 h1:gx9jr41ytcA3dXkbbd409euEaWtofCVXYBvJz3iYm18= -cloud.google.com/go/dataqna v0.6.0/go.mod h1:1lqNpM7rqNLVgWBJyk5NF6Uen2PHym0jtVJonplVsDA= -cloud.google.com/go/datastore v1.10.0 h1:4siQRf4zTiAVt/oeH4GureGkApgb2vtPQAtOmhpqQwE= -cloud.google.com/go/datastore v1.10.0/go.mod h1:PC5UzAmDEkAmkfaknstTYbNpgE49HAgW2J1gcgUfmdM= -cloud.google.com/go/datastream v1.5.0 h1:PgIgbhedBtYBU6POGXFMn2uSl9vpqubc3ewTNdcU8Mk= -cloud.google.com/go/datastream v1.5.0/go.mod h1:6TZMMNPwjUqZHBKPQ1wwXpb0d5VDVPl2/XoS5yi88q4= -cloud.google.com/go/deploy v1.5.0 h1:kI6dxt8Ml0is/x7YZjLveTvR7YPzXAUD/8wQZ2nH5zA= -cloud.google.com/go/deploy v1.5.0/go.mod h1:ffgdD0B89tToyW/U/D2eL0jN2+IEV/3EMuXHA0l4r+s= -cloud.google.com/go/dialogflow v1.19.0 h1:HYHVOkoxQ9bSfNIelSZYNAtUi4CeSrCnROyOsbOqPq8= -cloud.google.com/go/dialogflow v1.19.0/go.mod h1:JVmlG1TwykZDtxtTXujec4tQ+D8SBFMoosgy+6Gn0s0= -cloud.google.com/go/dlp v1.7.0 h1:9I4BYeJSVKoSKgjr70fLdRDumqcUeVmHV4fd5f9LR6Y= -cloud.google.com/go/dlp v1.7.0/go.mod h1:68ak9vCiMBjbasxeVD17hVPxDEck+ExiHavX8kiHG+Q= -cloud.google.com/go/documentai v1.10.0 h1:jfq09Fdjtnpnmt/MLyf6A3DM3ynb8B2na0K+vSXvpFM= -cloud.google.com/go/documentai v1.10.0/go.mod h1:vod47hKQIPeCfN2QS/jULIvQTugbmdc0ZvxxfQY1bg4= -cloud.google.com/go/domains v0.7.0 h1:pu3JIgC1rswIqi5romW0JgNO6CTUydLYX8zyjiAvO1c= -cloud.google.com/go/domains v0.7.0/go.mod h1:PtZeqS1xjnXuRPKE/88Iru/LdfoRyEHYA9nFQf4UKpg= -cloud.google.com/go/edgecontainer v0.2.0 h1:hd6J2n5dBBRuAqnNUEsKWrp6XNPKsaxwwIyzOPZTokk= -cloud.google.com/go/edgecontainer v0.2.0/go.mod h1:RTmLijy+lGpQ7BXuTDa4C4ssxyXT34NIuHIgKuP4s5w= -cloud.google.com/go/errorreporting v0.3.0 h1:kj1XEWMu8P0qlLhm3FwcaFsUvXChV/OraZwA70trRR0= -cloud.google.com/go/errorreporting v0.3.0/go.mod h1:xsP2yaAp+OAW4OIm60An2bbLpqIhKXdWR/tawvl7QzU= -cloud.google.com/go/essentialcontacts v1.4.0 h1:b6csrQXCHKQmfo9h3dG/pHyoEh+fQG1Yg78a53LAviY= -cloud.google.com/go/essentialcontacts v1.4.0/go.mod h1:8tRldvHYsmnBCHdFpvU+GL75oWiBKl80BiqlFh9tp+8= -cloud.google.com/go/eventarc v1.8.0 h1:AgCqrmMMIcel5WWKkzz5EkCUKC3Rl5LNMMYsS+LvsI0= -cloud.google.com/go/eventarc v1.8.0/go.mod h1:imbzxkyAU4ubfsaKYdQg04WS1NvncblHEup4kvF+4gw= -cloud.google.com/go/filestore v1.4.0 h1:yjKOpzvqtDmL5AXbKttLc8j0hL20kuC1qPdy5HPcxp0= -cloud.google.com/go/filestore v1.4.0/go.mod h1:PaG5oDfo9r224f8OYXURtAsY+Fbyq/bLYoINEK8XQAI= -cloud.google.com/go/firestore v1.9.0 h1:IBlRyxgGySXu5VuW0RgGFlTtLukSnNkpDiEOMkQkmpA= -cloud.google.com/go/firestore v1.9.0/go.mod h1:HMkjKHNTtRyZNiMzu7YAsLr9K3X2udY2AMwDaMEQiiE= -cloud.google.com/go/functions v1.9.0 h1:35tgv1fQOtvKqH/uxJMzX3w6usneJ0zXpsFr9KAVhNE= -cloud.google.com/go/functions v1.9.0/go.mod h1:Y+Dz8yGguzO3PpIjhLTbnqV1CWmgQ5UwtlpzoyquQ08= -cloud.google.com/go/gaming v1.8.0 h1:97OAEQtDazAJD7yh/kvQdSCQuTKdR0O+qWAJBZJ4xiA= -cloud.google.com/go/gaming v1.8.0/go.mod h1:xAqjS8b7jAVW0KFYeRUxngo9My3f33kFmua++Pi+ggM= -cloud.google.com/go/gkebackup v0.3.0 h1:4K+jiv4ocqt1niN8q5Imd8imRoXBHTrdnJVt/uFFxF4= -cloud.google.com/go/gkebackup v0.3.0/go.mod h1:n/E671i1aOQvUxT541aTkCwExO/bTer2HDlj4TsBRAo= -cloud.google.com/go/gkeconnect v0.6.0 h1:zAcvDa04tTnGdu6TEZewaLN2tdMtUOJJ7fEceULjguA= -cloud.google.com/go/gkeconnect v0.6.0/go.mod h1:Mln67KyU/sHJEBY8kFZ0xTeyPtzbq9StAVvEULYK16A= -cloud.google.com/go/gkehub v0.10.0 h1:JTcTaYQRGsVm+qkah7WzHb6e9sf1C0laYdRPn9aN+vg= -cloud.google.com/go/gkehub v0.10.0/go.mod h1:UIPwxI0DsrpsVoWpLB0stwKCP+WFVG9+y977wO+hBH0= -cloud.google.com/go/gkemulticloud v0.4.0 h1:8F1NhJj8ucNj7lK51UZMtAjSWTgP1zO18XF6vkfiPPU= -cloud.google.com/go/gkemulticloud v0.4.0/go.mod h1:E9gxVBnseLWCk24ch+P9+B2CoDFJZTyIgLKSalC7tuI= -cloud.google.com/go/gsuiteaddons v1.4.0 h1:TGT2oGmO5q3VH6SjcrlgPUWI0njhYv4kywLm6jag0to= -cloud.google.com/go/gsuiteaddons v1.4.0/go.mod h1:rZK5I8hht7u7HxFQcFei0+AtfS9uSushomRlg+3ua1o= -cloud.google.com/go/iam v0.8.0 h1:E2osAkZzxI/+8pZcxVLcDtAQx/u+hZXVryUaYQ5O0Kk= -cloud.google.com/go/iam v0.8.0/go.mod h1:lga0/y3iH6CX7sYqypWJ33hf7kkfXJag67naqGESjkE= -cloud.google.com/go/iap v1.5.0 h1:BGEXovwejOCt1zDk8hXq0bOhhRu9haXKWXXXp2B4wBM= -cloud.google.com/go/iap v1.5.0/go.mod h1:UH/CGgKd4KyohZL5Pt0jSKE4m3FR51qg6FKQ/z/Ix9A= -cloud.google.com/go/ids v1.2.0 h1:LncHK4HHucb5Du310X8XH9/ICtMwZ2PCfK0ScjWiJoY= -cloud.google.com/go/ids v1.2.0/go.mod h1:5WXvp4n25S0rA/mQWAg1YEEBBq6/s+7ml1RDCW1IrcY= -cloud.google.com/go/iot v1.4.0 h1:Y9+oZT9jD4GUZzORXTU45XsnQrhxmDT+TFbPil6pRVQ= -cloud.google.com/go/iot v1.4.0/go.mod h1:dIDxPOn0UvNDUMD8Ger7FIaTuvMkj+aGk94RPP0iV+g= -cloud.google.com/go/kms v1.6.0 h1:OWRZzrPmOZUzurjI2FBGtgY2mB1WaJkqhw6oIwSj0Yg= -cloud.google.com/go/kms v1.6.0/go.mod h1:Jjy850yySiasBUDi6KFUwUv2n1+o7QZFyuUJg6OgjA0= -cloud.google.com/go/language v1.8.0 h1:3Wa+IUMamL4JH3Zd3cDZUHpwyqplTACt6UZKRD2eCL4= -cloud.google.com/go/language v1.8.0/go.mod h1:qYPVHf7SPoNNiCL2Dr0FfEFNil1qi3pQEyygwpgVKB8= -cloud.google.com/go/lifesciences v0.6.0 h1:tIqhivE2LMVYkX0BLgG7xL64oNpDaFFI7teunglt1tI= -cloud.google.com/go/lifesciences v0.6.0/go.mod h1:ddj6tSX/7BOnhxCSd3ZcETvtNr8NZ6t/iPhY2Tyfu08= -cloud.google.com/go/logging v1.6.1 h1:ZBsZK+JG+oCDT+vaxwqF2egKNRjz8soXiS6Xv79benI= -cloud.google.com/go/logging v1.6.1/go.mod h1:5ZO0mHHbvm8gEmeEUHrmDlTDSu5imF6MUP9OfilNXBw= -cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs= -cloud.google.com/go/longrunning v0.3.0/go.mod h1:qth9Y41RRSUE69rDcOn6DdK3HfQfsUI0YSmW3iIlLJc= -cloud.google.com/go/managedidentities v1.4.0 h1:3Kdajn6X25yWQFhFCErmKSYTSvkEd3chJROny//F1A0= -cloud.google.com/go/managedidentities v1.4.0/go.mod h1:NWSBYbEMgqmbZsLIyKvxrYbtqOsxY1ZrGM+9RgDqInM= -cloud.google.com/go/maps v0.1.0 h1:kLReRbclTgJefw2fcCbdLPLhPj0U6UUWN10ldG8sdOU= -cloud.google.com/go/maps v0.1.0/go.mod h1:BQM97WGyfw9FWEmQMpZ5T6cpovXXSd1cGmFma94eubI= -cloud.google.com/go/mediatranslation v0.6.0 h1:qAJzpxmEX+SeND10Y/4868L5wfZpo4Y3BIEnIieP4dk= -cloud.google.com/go/mediatranslation v0.6.0/go.mod h1:hHdBCTYNigsBxshbznuIMFNe5QXEowAuNmmC7h8pu5w= -cloud.google.com/go/memcache v1.7.0 h1:yLxUzJkZVSH2kPaHut7k+7sbIBFpvSh1LW9qjM2JDjA= -cloud.google.com/go/memcache v1.7.0/go.mod h1:ywMKfjWhNtkQTxrWxCkCFkoPjLHPW6A7WOTVI8xy3LY= -cloud.google.com/go/metastore v1.8.0 h1:3KcShzqWdqxrDEXIBWpYJpOOrgpDj+HlBi07Grot49Y= -cloud.google.com/go/metastore v1.8.0/go.mod h1:zHiMc4ZUpBiM7twCIFQmJ9JMEkDSyZS9U12uf7wHqSI= -cloud.google.com/go/monitoring v1.8.0 h1:c9riaGSPQ4dUKWB+M1Fl0N+iLxstMbCktdEwYSPGDvA= -cloud.google.com/go/monitoring v1.8.0/go.mod h1:E7PtoMJ1kQXWxPjB6mv2fhC5/15jInuulFdYYtlcvT4= -cloud.google.com/go/networkconnectivity v1.7.0 h1:BVdIKaI68bihnXGdCVL89Jsg9kq2kg+II30fjVqo62E= -cloud.google.com/go/networkconnectivity v1.7.0/go.mod h1:RMuSbkdbPwNMQjB5HBWD5MpTBnNm39iAVpC3TmsExt8= -cloud.google.com/go/networkmanagement v1.5.0 h1:mDHA3CDW00imTvC5RW6aMGsD1bH+FtKwZm/52BxaiMg= -cloud.google.com/go/networkmanagement v1.5.0/go.mod h1:ZnOeZ/evzUdUsnvRt792H0uYEnHQEMaz+REhhzJRcf4= -cloud.google.com/go/networksecurity v0.6.0 h1:qDEX/3sipg9dS5JYsAY+YvgTjPR63cozzAWop8oZS94= -cloud.google.com/go/networksecurity v0.6.0/go.mod h1:Q5fjhTr9WMI5mbpRYEbiexTzROf7ZbDzvzCrNl14nyU= -cloud.google.com/go/notebooks v1.5.0 h1:AC8RPjNvel3ExgXjO1YOAz+teg9+j+89TNxa7pIZfww= -cloud.google.com/go/notebooks v1.5.0/go.mod h1:q8mwhnP9aR8Hpfnrc5iN5IBhrXUy8S2vuYs+kBJ/gu0= -cloud.google.com/go/optimization v1.2.0 h1:7PxOq9VTT7TMib/6dMoWpMvWS2E4dJEvtYzjvBreaec= -cloud.google.com/go/optimization v1.2.0/go.mod h1:Lr7SOHdRDENsh+WXVmQhQTrzdu9ybg0NecjHidBq6xs= -cloud.google.com/go/orchestration v1.4.0 h1:39d6tqvNjd/wsSub1Bn4cEmrYcet5Ur6xpaN+SxOxtY= -cloud.google.com/go/orchestration v1.4.0/go.mod h1:6W5NLFWs2TlniBphAViZEVhrXRSMgUGDfW7vrWKvsBk= -cloud.google.com/go/orgpolicy v1.5.0 h1:erF5PHqDZb6FeFrUHiYj2JK2BMhsk8CyAg4V4amJ3rE= -cloud.google.com/go/orgpolicy v1.5.0/go.mod h1:hZEc5q3wzwXJaKrsx5+Ewg0u1LxJ51nNFlext7Tanwc= -cloud.google.com/go/osconfig v1.10.0 h1:NO0RouqCOM7M2S85Eal6urMSSipWwHU8evzwS+siqUI= -cloud.google.com/go/osconfig v1.10.0/go.mod h1:uMhCzqC5I8zfD9zDEAfvgVhDS8oIjySWh+l4WK6GnWw= -cloud.google.com/go/oslogin v1.7.0 h1:pKGDPfeZHDybtw48WsnVLjoIPMi9Kw62kUE5TXCLCN4= -cloud.google.com/go/oslogin v1.7.0/go.mod h1:e04SN0xO1UNJ1M5GP0vzVBFicIe4O53FOfcixIqTyXo= -cloud.google.com/go/phishingprotection v0.6.0 h1:OrwHLSRSZyaiOt3tnY33dsKSedxbMzsXvqB21okItNQ= -cloud.google.com/go/phishingprotection v0.6.0/go.mod h1:9Y3LBLgy0kDTcYET8ZH3bq/7qni15yVUoAxiFxnlSUA= -cloud.google.com/go/policytroubleshooter v1.4.0 h1:NQklJuOUoz1BPP+Epjw81COx7IISWslkZubz/1i0UN8= -cloud.google.com/go/policytroubleshooter v1.4.0/go.mod h1:DZT4BcRw3QoO8ota9xw/LKtPa8lKeCByYeKTIf/vxdE= -cloud.google.com/go/privatecatalog v0.6.0 h1:Vz86uiHCtNGm1DeC32HeG2VXmOq5JRYA3VRPf8ZEcSg= -cloud.google.com/go/privatecatalog v0.6.0/go.mod h1:i/fbkZR0hLN29eEWiiwue8Pb+GforiEIBnV9yrRUOKI= -cloud.google.com/go/pubsub v1.27.1 h1:q+J/Nfr6Qx4RQeu3rJcnN48SNC0qzlYzSeqkPq93VHs= -cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0= -cloud.google.com/go/pubsublite v1.5.0 h1:iqrD8vp3giTb7hI1q4TQQGj77cj8zzgmMPsTZtLnprM= -cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg= -cloud.google.com/go/recaptchaenterprise/v2 v2.5.0 h1:UqzFfb/WvhwXGDF1eQtdHLrmni+iByZXY4h3w9Kdyv8= -cloud.google.com/go/recaptchaenterprise/v2 v2.5.0/go.mod h1:O8LzcHXN3rz0j+LBC91jrwI3R+1ZSZEWrfL7XHgNo9U= -cloud.google.com/go/recommendationengine v0.6.0 h1:6w+WxPf2LmUEqX0YyvfCoYb8aBYOcbIV25Vg6R0FLGw= -cloud.google.com/go/recommendationengine v0.6.0/go.mod h1:08mq2umu9oIqc7tDy8sx+MNJdLG0fUi3vaSVbztHgJ4= -cloud.google.com/go/recommender v1.8.0 h1:9kMZQGeYfcOD/RtZfcNKGKtoex3DdoB4zRgYU/WaIwE= -cloud.google.com/go/recommender v1.8.0/go.mod h1:PkjXrTT05BFKwxaUxQmtIlrtj0kph108r02ZZQ5FE70= -cloud.google.com/go/redis v1.10.0 h1:/zTwwBKIAD2DEWTrXZp8WD9yD/gntReF/HkPssVYd0U= -cloud.google.com/go/redis v1.10.0/go.mod h1:ThJf3mMBQtW18JzGgh41/Wld6vnDDc/F/F35UolRZPM= -cloud.google.com/go/resourcemanager v1.4.0 h1:NDao6CHMwEZIaNsdWy+tuvHaavNeGP06o1tgrR0kLvU= -cloud.google.com/go/resourcemanager v1.4.0/go.mod h1:MwxuzkumyTX7/a3n37gmsT3py7LIXwrShilPh3P1tR0= -cloud.google.com/go/resourcesettings v1.4.0 h1:eTzOwB13WrfF0kuzG2ZXCfB3TLunSHBur4s+HFU6uSM= -cloud.google.com/go/resourcesettings v1.4.0/go.mod h1:ldiH9IJpcrlC3VSuCGvjR5of/ezRrOxFtpJoJo5SmXg= -cloud.google.com/go/retail v1.11.0 h1:N9fa//ecFUOEPsW/6mJHfcapPV0wBSwIUwpVZB7MQ3o= -cloud.google.com/go/retail v1.11.0/go.mod h1:MBLk1NaWPmh6iVFSz9MeKG/Psyd7TAgm6y/9L2B4x9Y= -cloud.google.com/go/run v0.3.0 h1:AWPuzU7Xtaj3Jf+QarDWIs6AJ5hM1VFQ+F6Q+VZ6OT4= -cloud.google.com/go/run v0.3.0/go.mod h1:TuyY1+taHxTjrD0ZFk2iAR+xyOXEA0ztb7U3UNA0zBo= -cloud.google.com/go/scheduler v1.7.0 h1:K/mxOewgHGeKuATUJNGylT75Mhtjmx1TOkKukATqMT8= -cloud.google.com/go/scheduler v1.7.0/go.mod h1:jyCiBqWW956uBjjPMMuX09n3x37mtyPJegEWKxRsn44= -cloud.google.com/go/secretmanager v1.9.0 h1:xE6uXljAC1kCR8iadt9+/blg1fvSbmenlsDN4fT9gqw= -cloud.google.com/go/secretmanager v1.9.0/go.mod h1:b71qH2l1yHmWQHt9LC80akm86mX8AL6X1MA01dW8ht4= -cloud.google.com/go/security v1.10.0 h1:KSKzzJMyUoMRQzcz7azIgqAUqxo7rmQ5rYvimMhikqg= -cloud.google.com/go/security v1.10.0/go.mod h1:QtOMZByJVlibUT2h9afNDWRZ1G96gVywH8T5GUSb9IA= -cloud.google.com/go/securitycenter v1.16.0 h1:QTVtk/Reqnx2bVIZtJKm1+mpfmwRwymmNvlaFez7fQY= -cloud.google.com/go/securitycenter v1.16.0/go.mod h1:Q9GMaLQFUD+5ZTabrbujNWLtSLZIZF7SAR0wWECrjdk= -cloud.google.com/go/servicecontrol v1.5.0 h1:ImIzbOu6y4jL6ob65I++QzvqgFaoAKgHOG+RU9/c4y8= -cloud.google.com/go/servicecontrol v1.5.0/go.mod h1:qM0CnXHhyqKVuiZnGKrIurvVImCs8gmqWsDoqe9sU1s= -cloud.google.com/go/servicedirectory v1.7.0 h1:f7M8IMcVzO3T425AqlZbP3yLzeipsBHtRza8vVFYMhQ= -cloud.google.com/go/servicedirectory v1.7.0/go.mod h1:5p/U5oyvgYGYejufvxhgwjL8UVXjkuw7q5XcG10wx1U= -cloud.google.com/go/servicemanagement v1.5.0 h1:TpkCO5M7dhKSy1bKUD9o/sSEW/U1Gtx7opA1fsiMx0c= -cloud.google.com/go/servicemanagement v1.5.0/go.mod h1:XGaCRe57kfqu4+lRxaFEAuqmjzF0r+gWHjWqKqBvKFo= -cloud.google.com/go/serviceusage v1.4.0 h1:b0EwJxPJLpavSljMQh0RcdHsUrr5DQ+Nelt/3BAs5ro= -cloud.google.com/go/serviceusage v1.4.0/go.mod h1:SB4yxXSaYVuUBYUml6qklyONXNLt83U0Rb+CXyhjEeU= -cloud.google.com/go/shell v1.4.0 h1:b1LFhFBgKsG252inyhtmsUUZwchqSz3WTvAIf3JFo4g= -cloud.google.com/go/shell v1.4.0/go.mod h1:HDxPzZf3GkDdhExzD/gs8Grqk+dmYcEjGShZgYa9URw= -cloud.google.com/go/spanner v1.41.0 h1:NvdTpRwf7DTegbfFdPjAWyD7bOVu0VeMqcvR9aCQCAc= -cloud.google.com/go/spanner v1.41.0/go.mod h1:MLYDBJR/dY4Wt7ZaMIQ7rXOTLjYrmxLE/5ve9vFfWos= -cloud.google.com/go/speech v1.9.0 h1:yK0ocnFH4Wsf0cMdUyndJQ/hPv02oTJOxzi6AgpBy4s= -cloud.google.com/go/speech v1.9.0/go.mod h1:xQ0jTcmnRFFM2RfX/U+rk6FQNUF6DQlydUSyoooSpco= -cloud.google.com/go/storagetransfer v1.6.0 h1:fUe3OydbbvHcAYp07xY+2UpH4AermGbmnm7qdEj3tGE= -cloud.google.com/go/storagetransfer v1.6.0/go.mod h1:y77xm4CQV/ZhFZH75PLEXY0ROiS7Gh6pSKrM8dJyg6I= -cloud.google.com/go/talent v1.4.0 h1:MrekAGxLqAeAol4Sc0allOVqUGO8j+Iim8NMvpiD7tM= -cloud.google.com/go/talent v1.4.0/go.mod h1:ezFtAgVuRf8jRsvyE6EwmbTK5LKciD4KVnHuDEFmOOA= -cloud.google.com/go/texttospeech v1.5.0 h1:ccPiHgTewxgyAeCWgQWvZvrLmbfQSFABTMAfrSPLPyY= -cloud.google.com/go/texttospeech v1.5.0/go.mod h1:oKPLhR4n4ZdQqWKURdwxMy0uiTS1xU161C8W57Wkea4= -cloud.google.com/go/tpu v1.4.0 h1:ztIdKoma1Xob2qm6QwNh4Xi9/e7N3IfvtwG5AcNsj1g= -cloud.google.com/go/tpu v1.4.0/go.mod h1:mjZaX8p0VBgllCzF6wcU2ovUXN9TONFLd7iz227X2Xg= -cloud.google.com/go/trace v1.4.0 h1:qO9eLn2esajC9sxpqp1YKX37nXC3L4BfGnPS0Cx9dYo= -cloud.google.com/go/trace v1.4.0/go.mod h1:UG0v8UBqzusp+z63o7FK74SdFE+AXpCLdFb1rshXG+Y= -cloud.google.com/go/translate v1.4.0 h1:AOYOH3MspzJ/bH1YXzB+xTE8fMpn3mwhLjugwGXvMPI= -cloud.google.com/go/translate v1.4.0/go.mod h1:06Dn/ppvLD6WvA5Rhdp029IX2Mi3Mn7fpMRLPvXT5Wg= -cloud.google.com/go/video v1.9.0 h1:ttlvO4J5c1VGq6FkHqWPD/aH6PfdxujHt+muTJlW1Zk= -cloud.google.com/go/video v1.9.0/go.mod h1:0RhNKFRF5v92f8dQt0yhaHrEuH95m068JYOvLZYnJSw= -cloud.google.com/go/videointelligence v1.9.0 h1:RPFgVVXbI2b5vnrciZjtsUgpNKVtHO/WIyXUhEfuMhA= -cloud.google.com/go/videointelligence v1.9.0/go.mod h1:29lVRMPDYHikk3v8EdPSaL8Ku+eMzDljjuvRs105XoU= -cloud.google.com/go/vision/v2 v2.5.0 h1:TQHxRqvLMi19azwm3qYuDbEzZWmiKJNTpGbkNsfRCik= -cloud.google.com/go/vision/v2 v2.5.0/go.mod h1:MmaezXOOE+IWa+cS7OhRRLK2cNv1ZL98zhqFFZaaH2E= -cloud.google.com/go/vmmigration v1.3.0 h1:A2Tl2ZmwMRpvEmhV2ibISY85fmQR+Y5w9a0PlRz5P3s= -cloud.google.com/go/vmmigration v1.3.0/go.mod h1:oGJ6ZgGPQOFdjHuocGcLqX4lc98YQ7Ygq8YQwHh9A7g= -cloud.google.com/go/vmwareengine v0.1.0 h1:JMPZaOT/gIUxVlTqSl/QQ32Y2k+r0stNeM1NSqhVP9o= -cloud.google.com/go/vmwareengine v0.1.0/go.mod h1:RsdNEf/8UDvKllXhMz5J40XxDrNJNN4sagiox+OI208= -cloud.google.com/go/vpcaccess v1.5.0 h1:woHXXtnW8b9gLFdWO9HLPalAddBQ9V4LT+1vjKwR3W8= -cloud.google.com/go/vpcaccess v1.5.0/go.mod h1:drmg4HLk9NkZpGfCmZ3Tz0Bwnm2+DKqViEpeEpOq0m8= -cloud.google.com/go/webrisk v1.7.0 h1:ypSnpGlJnZSXbN9a13PDmAYvVekBLnGKxQ3Q9SMwnYY= -cloud.google.com/go/webrisk v1.7.0/go.mod h1:mVMHgEYH0r337nmt1JyLthzMr6YxwN1aAIEc2fTcq7A= -cloud.google.com/go/websecurityscanner v1.4.0 h1:y7yIFg/h/mO+5Y5aCOtVAnpGUOgqCH5rXQ2Oc8Oq2+g= -cloud.google.com/go/websecurityscanner v1.4.0/go.mod h1:ebit/Fp0a+FWu5j4JOmJEV8S8CzdTkAS77oDsiSqYWQ= -cloud.google.com/go/workflows v1.9.0 h1:7Chpin9p50NTU8Tb7qk+I11U/IwVXmDhEoSsdccvInE= -cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= -github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe h1:QQ3GSy+MqSHxm/d8nCtnAiZdYFd45cYZPs8vOOIYKfk= -github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= -github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b h1:ACGZRIr7HsgBKHsueQ1yM4WaVaXh21ynwqsF8M8tXhA= -github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/envoyproxy/go-control-plane v0.10.3 h1:xdCVXxEe0Y3FQith+0cj2irwZudqGYvecuLB1HtdexY= -github.com/envoyproxy/go-control-plane v0.10.3/go.mod h1:fJJn/j26vwOu972OllsvAgJJM//w9BV6Fxbg2LuVd34= -github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= -github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= -github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= -github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -279,38 +42,27 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M= -golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0= -google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 0d6c0a7..44e97b0 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -30,14 +30,14 @@ func NewBroker() *Broker { } func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error) { - b.logger.Info("Broker received event for publishing", zap.String("Topic", event.Topic)) + //b.logger.Info("Broker received event for publishing", zap.String("Topic", event.Topic)) t, err := b.findOrInsertTopic(event.Topic) if err != nil { return nil, err } - t.PublishEvent(event) + go t.PublishEvent(event) return &pb.ACK{}, nil } diff --git a/internal/broker/retry.go b/internal/broker/retry.go index d4915e4..5d3b456 100644 --- a/internal/broker/retry.go +++ b/internal/broker/retry.go @@ -1,13 +1,10 @@ package broker import ( - "strconv" - "sync" - "time" - "github.com/ispiroglu/mercurius/internal/logger" "github.com/ispiroglu/mercurius/proto" "go.uber.org/zap" + "sync" ) /* @@ -18,9 +15,10 @@ import ( // What should these values be? // This values should be configurable from yml. const retryBufferSize = 5000000 -const retryCount = 5 -const retryTime = 100 -const retryTimeType = time.Millisecond + +// const retryCount = 5 +// const retryTime = 100 +// const retryTimeType = time.Millisecond var SubscriberRetryHandler = NewRetryHandler() @@ -60,7 +58,7 @@ func (rh *RetryHandler) RemoveRetryQueue(subId string) { } func (rh *RetryHandler) CreateRetryQueue(subId string, eq chan *proto.Event) chan *proto.Event { - rq := make(chan *proto.Event, retryBufferSize) + rq := make(chan *proto.Event) rh.repository.addChannel(subId, rq) go rh.HandleRetryQueue(rq, eq) return rq @@ -72,24 +70,24 @@ func GetRetryQueue(subId string) chan *proto.Event { // TODO remove entry from map func (rh *RetryHandler) HandleRetryQueue(rq chan *proto.Event, eq chan *proto.Event) { - eventRetryCount := make(map[string]int) - for { - event := <-rq - eventRetryCount[event.Id]++ - if eventRetryCount[event.Id] == -1 { - delete(eventRetryCount, event.Id) - rh.logger.Info("Discarded event " + event.Id + " retry limit reached") - } else { - rh.logger.Info("Retrying for event " + event.Id + " [" + strconv.Itoa(eventRetryCount[event.Id]) + "]") - go func() { - if eventRetryCount[event.Id] == retryCount { - eventRetryCount[event.Id] = -2 - } - time.Sleep(retryTime * retryTimeType) - eq <- event - }() - } - } + // eventRetryCount := make(map[string]int) + // for { + // event := <-rq + // eventRetryCount[event.Id]++ + // if eventRetryCount[event.Id] == -1 { + // delete(eventRetryCount, event.Id) + // rh.logger.Info("Discarded event " + event.Id + " retry limit reached") + // } else { + // rh.logger.Info("Retrying for event " + event.Id + " [" + strconv.Itoa(eventRetryCount[event.Id]) + "]") + // go func() { + // if eventRetryCount[event.Id] == retryCount { + // eventRetryCount[event.Id] = -2 + // } + // time.Sleep(retryTime * retryTimeType) + // eq <- event + // }() + // } + // } } func (r *RetryMapRepository) addChannel(sId string, c chan *proto.Event) { diff --git a/internal/broker/subscriber.go b/internal/broker/subscriber.go index 4e2c428..17840b5 100644 --- a/internal/broker/subscriber.go +++ b/internal/broker/subscriber.go @@ -41,7 +41,7 @@ func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error { } func NewSubscriber(ctx context.Context, sId string, sName string, topicName string) *Subscriber { - eq := make(chan *proto.Event, 5000) + eq := make(chan *proto.Event) return &Subscriber{ logger: logger.NewLogger(), Id: sId, diff --git a/internal/broker/topic.go b/internal/broker/topic.go index fb151de..6e19cab 100644 --- a/internal/broker/topic.go +++ b/internal/broker/topic.go @@ -16,8 +16,8 @@ import ( type Topic struct { sync.RWMutex logger *zap.Logger - Name string - SubscriberRepository *SubscriberRepository + Name string + SubscriberRepository *SubscriberRepository EventChan chan *proto.Event } @@ -75,10 +75,10 @@ func (t *Topic) PublishEvent(event *proto.Event) { t.SubscriberRepository.Lock() defer t.SubscriberRepository.Unlock() for _, s := range t.SubscriberRepository.Subscribers { - go func(s *Subscriber, event *proto.Event) { - s.logger.Info("Sending event to subscriber", zap.String("Topic", event.Topic), zap.String("SubscriberID", s.Id), zap.String("Subscriber name", s.Name)) - s.EventChannel <- event - }(s, event) + //go func(s *Subscriber, event *proto.Event) { + //s.logger.Info("Sending event to subscriber", zap.String("Topic", event.Topic), zap.String("SubscriberID", s.Id), zap.String("Subscriber name", s.Name)) + s.EventChannel <- event + //}(s, event) } } } @@ -94,14 +94,14 @@ func (t *Topic) AddSubscriber(ctx context.Context, id string, name string) (*Sub t.logger.Info("Added subscriber", zap.String("Topic", t.Name), zap.String("sId", id), zap.String("sName", name)) if len(t.SubscriberRepository.Subscribers) == 1 { - go func() { - if len(t.EventChan) != 0 { - for event := range t.EventChan { - s.logger.Info("Sending event to subscriber", zap.String("TopicName", event.Topic), zap.String("SubscriberID", s.Id), zap.String("Subscriber name", s.Name)) - s.EventChannel <- event - } + // go func() { + if len(t.EventChan) != 0 { + for event := range t.EventChan { + s.logger.Info("Sending event to subscriber", zap.String("TopicName", event.Topic), zap.String("SubscriberID", s.Id), zap.String("Subscriber name", s.Name)) + s.EventChannel <- event } - }() + } + //}() } return s, nil @@ -121,6 +121,6 @@ func newTopic(name string) *Topic { logger: logger.NewLogger(), Name: name, SubscriberRepository: NewSubscriberRepository(), - EventChan: make(chan *proto.Event, 5000), + EventChan: make(chan *proto.Event), } } diff --git a/internal/server/server.go b/internal/server/server.go index 4704046..b226b62 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -2,13 +2,18 @@ package server import ( "context" + "fmt" + "sync/atomic" + "time" + "github.com/ispiroglu/mercurius/internal/broker" "github.com/ispiroglu/mercurius/internal/logger" "github.com/ispiroglu/mercurius/proto" "go.uber.org/zap" - "runtime" ) +const xz = 100 * 100 * 100 + type Server struct { logger *zap.Logger broker *broker.Broker @@ -23,12 +28,16 @@ func NewMercuriusServer() *Server { } } +var y = atomic.Uint64{} + func (s *Server) Publish(_ context.Context, event *proto.Event) (*proto.ACK, error) { + if y.Add(1) == 1 { + start = time.Now() + } return s.broker.Publish(event) } func (s *Server) Subscribe(req *proto.SubscribeRequest, stream proto.Mercurius_SubscribeServer) error { - s.logger.Info("Received subscribe request", zap.String("Topic", req.Topic)) ctx := stream.Context() sub, err := s.broker.Subscribe(ctx, req.Topic, req.SubscriberID, req.SubscriberName) if err != nil { @@ -36,28 +45,48 @@ func (s *Server) Subscribe(req *proto.SubscribeRequest, stream proto.Mercurius_S return err } - for { + for w := 0; w < 1; w++ { + go consumerTask(stream, sub, s.logger) + } + + cc := make(chan struct{}) + <-cc + + /* for { select { case <-sub.Ctx.Done(): broker.SubscriberRetryHandler.RemoveRetryQueue(sub.Id) go func(sub *broker.Subscriber) { - s.broker.Unsubscribe(sub) + // s.broker.Unsubscribe(sub) runtime.GC() }(sub) return nil case event := <-sub.EventChannel: + + // time.Sleep(4 * time.Second) go func() { + // Send isleminde kalan var if err := stream.Send(event); err != nil { - s.logger.Error("Error on sending event", zap.String("TopicName", event.Topic), zap.String("SubscriberID", req.SubscriberID), zap.String("Subscriber Name", req.SubscriberName)) //, zap.Error(err)) + panic("") + s.logger.Error("Error on sending event", zap.String("TopicName", event.Topic), zap.String("SubscriberID", sub.Id), zap.String("Subscriber Name", sub.Name)) //, zap.Error(err)) s.logger.Info("Sending event to retry queue") sub.RetryQueue <- event } + + x := messageCount.Add(1) + + if x == 100*100*100 { + z := time.Since(start) + fmt.Println("Execution time: ", z) + } }() } - } + } */ + + return nil } func (s *Server) Retry(_ context.Context, req *proto.RetryRequest) (*proto.ACK, error) { @@ -65,3 +94,36 @@ func (s *Server) Retry(_ context.Context, req *proto.RetryRequest) (*proto.ACK, s.broker.SubscriberRepository.Subscribers[req.SubscriberID].RetryQueue <- req.Event return &proto.ACK{}, nil } + +var messageCount = atomic.Uint64{} +var start time.Time + +func consumerTask(stream proto.Mercurius_SubscribeServer, sub *broker.Subscriber, logger *zap.Logger) { + for { + + select { + case <-sub.Ctx.Done(): + + broker.SubscriberRetryHandler.RemoveRetryQueue(sub.Id) + go func(sub *broker.Subscriber) { + // s.broker.Unsubscribe(sub) + }(sub) + + return + case event := <-sub.EventChannel: + go func() { + if err := stream.Send(event); err != nil { + logger.Error("Error on sending event", zap.String("TopicName", event.Topic), zap.String("SubscriberID", sub.Id), zap.String("Subscriber Name", sub.Name)) //, zap.Error(err)) + logger.Info("Sending event to retry queue") + sub.RetryQueue <- event + } + x := messageCount.Add(1) + + if x == xz { + z := time.Since(start) + fmt.Println("Execution time: ", z) + } + }() + } + } +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 890d481..21dd892 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -53,10 +53,9 @@ func (client *Client) Subscribe(topicName string, ctx context.Context, fn func(e if err != nil { // TODO: What if cannot receive? l.Error("", zap.Error(err)) - continue + panic(err) } - l.Info("Received Event", zap.String("Client", client.Name), zap.String("Topic", e.Topic)) err = fn(e) if err != nil { _ = client.retry(ctx, e, r.SubscriberID) @@ -67,14 +66,15 @@ func (client *Client) Subscribe(topicName string, ctx context.Context, fn func(e return nil } +// Publish This function needs to be sync in order to be able to handle error on publish. func (client *Client) Publish(topicName string, body []byte, ctx context.Context) error { e, err := client.createEvent(topicName, body) if err != nil { return err } - go client.c.Publish(ctx, e) - return nil + _, err = client.c.Publish(ctx, e) + return err } func (client *Client) retry(ctx context.Context, e *proto.Event, subId string) error { diff --git a/pkg/client/grpc_dial.go b/pkg/client/grpc_dial.go index 51779fb..55973df 100644 --- a/pkg/client/grpc_dial.go +++ b/pkg/client/grpc_dial.go @@ -13,7 +13,8 @@ var ( func getConnection(addr string) *grpc.ClientConn { o.Do(func() { - c, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + c, err := grpc.Dial(addr, + grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { conn = nil } else { diff --git a/prometheus/datasource.yaml b/prometheus/datasource.yaml new file mode 100644 index 0000000..57f7e17 --- /dev/null +++ b/prometheus/datasource.yaml @@ -0,0 +1,20 @@ +apiVersion: 1 +datasources: +- name: Prometheus + type: prometheus + url: http://prometheus:9090 + isDefault: true + access: proxy + editable: true +- name: Loki + type: loki + url: http://loki:3100 + isDefault: false + access: proxy + editable: true +- name: Tempo + type: tempo + url: http://tempo:3200 + isDefault: false + access: proxy + editable: true \ No newline at end of file diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml new file mode 100644 index 0000000..9332dc9 --- /dev/null +++ b/prometheus/prometheus.yml @@ -0,0 +1,8 @@ +scrape_configs: + - job_name: 'Mercurius' + metrics_path: '/metrics' + scrape_interval: 1ms + static_configs: + - targets: ['host.docker.internal:8081'] + labels: + application: 'Mercurius' diff --git a/prometheus/tempo.yaml b/prometheus/tempo.yaml new file mode 100644 index 0000000..654c967 --- /dev/null +++ b/prometheus/tempo.yaml @@ -0,0 +1,39 @@ +server: + http_listen_port: 3200 + +query_frontend: + search: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + trace_by_id: + duration_slo: 5s + +distributor: + receivers: + otlp: + protocols: + http: + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /tmp/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + +storage: + trace: + backend: local + wal: + path: /tmp/tempo/wal + local: + path: /tmp/tempo/blocks + +overrides: + defaults: + metrics_generator: + processors: [service-graphs, span-metrics] \ No newline at end of file diff --git a/test/broker_test.go b/test/broker_test.go index ca65e93..fdaa660 100644 --- a/test/broker_test.go +++ b/test/broker_test.go @@ -42,10 +42,10 @@ func TestOneOneMessageReliability(t *testing.T) { t.Run("Messages sent and received should be the same", func(t *testing.T) { - cSub.Subscribe(TopicName, ctx, authenticityHandlerOneOne) + _ = cSub.Subscribe(TopicName, ctx, authenticityHandlerOneOne) for i := 0; i < MessageCount; i++ { - cPub.Publish(TopicName, []byte(fmt.Sprintf("%d", i)), context.Background()) + _ = cPub.Publish(TopicName, []byte(fmt.Sprintf("%d", i)), context.Background()) } <-doneOneOne @@ -67,14 +67,14 @@ func TestNOneMessageReliability(t *testing.T) { cSub, _ := client.NewClient("sub", ADDR) - cSub.Subscribe(TopicName, context.Background(), authenticityHandlerNOne) + _ = cSub.Subscribe(TopicName, context.Background(), authenticityHandlerNOne) for i := 0; i < MessageCount; i++ { if i%(MessageCount/n) == 0 { cPub, _ = client.NewClient("pub", ADDR) fmt.Println("Changed publisher") } - cPub.Publish(TopicName, []byte(fmt.Sprintf("%d", i)), context.Background()) + _ = cPub.Publish(TopicName, []byte(fmt.Sprintf("%d", i)), context.Background()) } time.Sleep(3 * time.Second) @@ -90,8 +90,8 @@ func TestMessageResendRequest(t *testing.T) { cSub, _ := client.NewClient("sub", ADDR) - cSub.Subscribe(TopicName, context.Background(), resendHandler) - cPub.Publish(TopicName, []byte("message"), context.Background()) + _ = cSub.Subscribe(TopicName, context.Background(), resendHandler) + _ = cPub.Publish(TopicName, []byte("message"), context.Background()) time.Sleep(1 * time.Second) assert.Equal(t, true, gotSecondTime) })