diff --git a/module_2/goka_golang/project/.idea/.gitignore b/module_2/goka_golang/project/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/module_2/goka_golang/project/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/module_2/goka_golang/project/.idea/modules.xml b/module_2/goka_golang/project/.idea/modules.xml new file mode 100644 index 0000000..a0733a5 --- /dev/null +++ b/module_2/goka_golang/project/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/module_2/goka_golang/project/.idea/project.iml b/module_2/goka_golang/project/.idea/project.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/module_2/goka_golang/project/.idea/project.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/module_2/goka_golang/project/.idea/vcs.xml b/module_2/goka_golang/project/.idea/vcs.xml new file mode 100644 index 0000000..c2365ab --- /dev/null +++ b/module_2/goka_golang/project/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/module_2/goka_golang/project/blocker/blocker.go b/module_2/goka_golang/project/blocker/blocker.go new file mode 100644 index 0000000..3cc0602 --- /dev/null +++ b/module_2/goka_golang/project/blocker/blocker.go @@ -0,0 +1,74 @@ +package blocker + +import ( + "context" + "encoding/json" + "log" + + "github.com/lovoo/goka" +) + +var ( + Group goka.Group = "blocker" +) + +type BlockEvent struct { + Block bool + Name string +} + +type BlockEventCodec struct{} + +func (c *BlockEventCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) { + var m BlockEvent + return &m, json.Unmarshal(data, &m) +} + +type BlockValue struct { + Blocked map[string]bool +} +type BlockValueCodec struct{} + +func (c *BlockValueCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) { + var m BlockValue + return &m, json.Unmarshal(data, &m) +} + +func block(ctx goka.Context, msg interface{}) { + var s *BlockValue + if v := ctx.Value(); v == nil { + s = &BlockValue{make(map[string]bool)} + } else { + s = v.(*BlockValue) + } + msgBlockEvent, ok := msg.(*BlockEvent) + if !ok { + return + } + s.Blocked[msgBlockEvent.Name] = msgBlockEvent.Block + ctx.SetValue(s) +} + +func RunBlocker(brokers []string, inputTopic goka.Stream) { + g := goka.DefineGroup(Group, + goka.Input(inputTopic, new(BlockEventCodec), block), + goka.Persist(new(BlockValueCodec)), + ) + p, err := goka.NewProcessor(brokers, g) + + if err != nil { + log.Fatal(err) + } + err = p.Run(context.Background()) + if err != nil { + log.Fatal(err) + } +} diff --git a/module_2/goka_golang/project/censure/censure.go b/module_2/goka_golang/project/censure/censure.go new file mode 100644 index 0000000..35c9a09 --- /dev/null +++ b/module_2/goka_golang/project/censure/censure.go @@ -0,0 +1,36 @@ +package censure + +import ( + "context" + "log" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" +) + +var ( + Group goka.Group = "censor" +) + +type ValueCodec struct { + codec.String +} + +func replaceWord(ctx goka.Context, msg interface{}) { + ctx.SetValue(msg.(string)) +} + +func RunCensore(broker []string, inputStream goka.Stream) { + g := goka.DefineGroup(Group, + goka.Input(inputStream, new(ValueCodec), replaceWord), + goka.Persist(new(ValueCodec)), + ) + p, err := goka.NewProcessor(broker, g) + if err != nil { + log.Fatal(err) + } + err = p.Run(context.Background()) + if err != nil { + log.Fatal(err) + } +} diff --git a/module_2/goka_golang/project/cmd/block-user/main.go b/module_2/goka_golang/project/cmd/block-user/main.go new file mode 100644 index 0000000..c0b3e34 --- /dev/null +++ b/module_2/goka_golang/project/cmd/block-user/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "flag" + "log" + + "github.com/lovoo/goka" + "project/blocker" +) + +var ( + user = flag.String("user", "", "user to block") + block = flag.Bool("block", true, "block user") + name = flag.String("name", "", "name of user to block") + broker = flag.String("broker", "localhost:29092", "boostrap Kafka broker") + stream = flag.String("stream", "", "stream name") +) + +func main() { + flag.Parse() + if *user == "" { + log.Fatal("невозможно заблокировать пользователя ''") + } + emitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(*stream), new(blocker.BlockEventCodec)) + if err != nil { + log.Fatal(err) + } + defer emitter.Finish() + err = emitter.EmitSync(*user, &blocker.BlockEvent{Block: *block, Name: *name}) + if err != nil { + log.Fatal(err) + } +} diff --git a/module_2/goka_golang/project/cmd/censore/main.go b/module_2/goka_golang/project/cmd/censore/main.go new file mode 100644 index 0000000..7dd4a9e --- /dev/null +++ b/module_2/goka_golang/project/cmd/censore/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "flag" + "log" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" +) + +var ( + word = flag.String("word", "", "word censored") + with = flag.String("with", "", "new word") + broker = flag.String("broker", "localhost:29092", "boostrap Kafka broker") +) + +func main() { + flag.Parse() + if *word == "" { + log.Fatalln("cannot censor word ''") + } + emitter, err := goka.NewEmitter([]string{*broker}, "censor-stream", new(codec.String)) + if err != nil { + panic(err) + } + defer emitter.Finish() + + err = emitter.EmitSync(*word, *with) + if err != nil { + panic(err) + } +} diff --git a/module_2/goka_golang/project/docs/messages.png b/module_2/goka_golang/project/docs/messages.png new file mode 100644 index 0000000..8947d79 Binary files /dev/null and b/module_2/goka_golang/project/docs/messages.png differ diff --git a/module_2/goka_golang/project/emitter/emitter.go b/module_2/goka_golang/project/emitter/emitter.go new file mode 100644 index 0000000..2f3e3fe --- /dev/null +++ b/module_2/goka_golang/project/emitter/emitter.go @@ -0,0 +1,58 @@ +package emitter + +import ( + "log" + "math/rand" + "time" + + "github.com/lovoo/goka" + "project/message" +) + +const sendTimeInterval = time.Second + +var ( + users = []string{ + "Alex", + "Dian", + "Xenia", + } + + contents = []string{ + "Hi how are you doing", + "Hello let's have lunch together", + "i'm sad", + "i'm happy", + } +) + +func RunEmitter(brokers []string, outputTopic goka.Stream) { + var emitter, err = goka.NewEmitter(brokers, outputTopic, new(message.Codec)) + if err != nil { + log.Fatal(err) + } + defer emitter.Finish() + + t := time.NewTicker(sendTimeInterval) + defer t.Stop() + + for range t.C { + sender := users[rand.Intn(len(users))] + receiver := users[rand.Intn(len(users))] + for receiver == sender { + sender = users[rand.Intn(len(users))] + } + content := contents[rand.Intn(len(contents))] + + fakeUserMessage := &message.Message{ + From: sender, + To: receiver, + Content: content, + } + + err = emitter.EmitSync(receiver, fakeUserMessage) + if err != nil { + log.Fatal(err) + } + } +} diff --git a/module_2/goka_golang/project/filter/filter.go b/module_2/goka_golang/project/filter/filter.go new file mode 100644 index 0000000..7e3840f --- /dev/null +++ b/module_2/goka_golang/project/filter/filter.go @@ -0,0 +1,63 @@ +package filter + +import ( + "context" + "log" + "strings" + + "github.com/lovoo/goka" + "project/blocker" + "project/censure" + "project/message" +) + +var ( + filterGroup goka.Group = "filter" +) + +func shouldDrop(ctx goka.Context, msg interface{}) bool { + v := ctx.Join(goka.GroupTable(blocker.Group)) + msgSend, ok := msg.(*message.Message) + if !ok { + return false + } + return v != nil && v.(*blocker.BlockValue).Blocked[msgSend.From] +} + +func censor(ctx goka.Context, m *message.Message) *message.Message { + words := strings.Split(m.Content, " ") + for i, w := range words { + if tw := ctx.Lookup(goka.GroupTable(censure.Group), w); tw != nil { + words[i] = tw.(string) + } + } + return &message.Message{ + From: m.From, + To: m.To, + Content: strings.Join(words, " "), + } +} + +func RunFilter(brokers []string, inputTopic goka.Stream, outputTopic goka.Stream) { + g := goka.DefineGroup(filterGroup, + goka.Input(inputTopic, new(message.Codec), func(ctx goka.Context, msg interface{}) { + if shouldDrop(ctx, msg) { + return + } + m := censor(ctx, msg.(*message.Message)) + ctx.Emit(outputTopic, ctx.Key(), m) + }), + goka.Output(outputTopic, new(message.Codec)), + goka.Join(goka.GroupTable(blocker.Group), new(blocker.BlockValueCodec)), + goka.Lookup(goka.GroupTable(censure.Group), new(censure.ValueCodec)), + ) + + p, err := goka.NewProcessor(brokers, g) + if err != nil { + log.Fatal(err) + } + err = p.Run(context.Background()) + if err != nil { + log.Fatal(err) + } +} diff --git a/module_2/goka_golang/project/go.mod b/module_2/goka_golang/project/go.mod new file mode 100644 index 0000000..8ee030f --- /dev/null +++ b/module_2/goka_golang/project/go.mod @@ -0,0 +1,34 @@ +module project + +go 1.23.4 + +require ( + github.com/gorilla/mux v1.8.1 + github.com/lovoo/goka v1.1.12 +) + +require ( + github.com/IBM/sarama v1.41.3 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/go-stack/stack v1.8.1 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sync v0.4.0 // indirect +) diff --git a/module_2/goka_golang/project/go.sum b/module_2/goka_golang/project/go.sum new file mode 100644 index 0000000..58312d7 --- /dev/null +++ b/module_2/goka_golang/project/go.sum @@ -0,0 +1,141 @@ +github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= +github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/lovoo/goka v1.1.12 h1:DtE1MYc/T9FjgvAvzSo6kaiBAQsGMbeFhSwwcuG/pzw= +github.com/lovoo/goka v1.1.12/go.mod h1:VftsNJ0Qqd5XV+YBvWNtbJqq+X/Vq5/qXCs8iVz4OJU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/module_2/goka_golang/project/infra/docker-compose.yaml b/module_2/goka_golang/project/infra/docker-compose.yaml new file mode 100644 index 0000000..846c1d3 --- /dev/null +++ b/module_2/goka_golang/project/infra/docker-compose.yaml @@ -0,0 +1,106 @@ +version: '2' +services: + zookeeper: + ports: + - "127.0.0.1:22181:2181" + container_name: zookeeper + image: confluentinc/cp-zookeeper:7.4.4 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + networks: + - kafka-network + + kafka-1: + image: confluentinc/cp-kafka:7.4.4 + container_name: kafka-1 + ports: + - "127.0.0.1:29092:29092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + - kafka-network + + kafka-2: + image: confluentinc/cp-kafka:7.4.4 + container_name: kafka-2 + ports: + - "127.0.0.1:39092:39092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + - kafka-network + + kafka-3: + image: confluentinc/cp-kafka:7.4.4 + container_name: kafka-3 + ports: + - "127.0.0.1:49092:49092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://localhost:49092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + - kafka-network + + ui: + image: provectuslabs/kafka-ui:v0.7.0 + restart: always + ports: + - "127.0.0.1:8085:8080" + environment: + KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka-1:9092 + KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb:8088 + networks: + - kafka-network + + ksqldb: + image: confluentinc/ksqldb-server:0.29.0 + hostname: ksqldb + container_name: ksqldb + depends_on: + - kafka-1 + ports: + - "8088:8088" + environment: + KSQL_LISTENERS: "http://0.0.0.0:8088" + KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb-server' + KSQL_BOOTSTRAP_SERVERS: "kafka-1:9092" + KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" + # Configuration to embed Kafka Connect support. + KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster" + KSQL_CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092" + KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" + KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-configs" + KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-offsets" + KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-statuses" + KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + networks: + - kafka-network + +networks: + kafka-network: + driver: bridge \ No newline at end of file diff --git a/module_2/goka_golang/project/main.go b/module_2/goka_golang/project/main.go new file mode 100644 index 0000000..5789ba8 --- /dev/null +++ b/module_2/goka_golang/project/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "github.com/lovoo/goka" + "project/blocker" + "project/censure" + "project/emitter" + "project/filter" + "project/user" +) + +var brokers = []string{"127.0.0.1:29092"} + +var ( + Emitter2FilterTopic goka.Stream = "emitter2filter-stream" + Filter2UserProcessorTopic goka.Stream = "filter2userprocessor-stream" + BlockerTopic goka.Stream = "blocker-stream" + CensorTopic goka.Stream = "censor-stream" +) + +func main() { + go blocker.RunBlocker(brokers, BlockerTopic) + go censure.RunCensore(brokers, CensorTopic) + go emitter.RunEmitter(brokers, Emitter2FilterTopic) + go filter.RunFilter(brokers, Emitter2FilterTopic, Filter2UserProcessorTopic) + go user.RunUserProcessor(brokers, Filter2UserProcessorTopic) + user.RunUserView(brokers) +} diff --git a/module_2/goka_golang/project/message/message.go b/module_2/goka_golang/project/message/message.go new file mode 100644 index 0000000..c1438e9 --- /dev/null +++ b/module_2/goka_golang/project/message/message.go @@ -0,0 +1,43 @@ +package message + +import ( + "encoding/json" + "fmt" + + "github.com/lovoo/goka" +) + +var ( + Group goka.Group = "user-message" +) + +// Message — содержит парамтеры пользователя +type Message struct { + From string // отправитель + To string // получатель + Content string // данные +} + +// Codec позволяет сериализовать и десериализовать Message в/из групповой таблицы. +type Codec struct{} + +// Encode переводит Message в []byte +func (uc *Codec) Encode(value any) ([]byte, error) { + if _, isMessage := value.(*Message); !isMessage { + return nil, fmt.Errorf("тип должен быть *Message, получен %T", value) + } + return json.Marshal(value) +} + +// Decode переводит Message из []byte в структуру. +func (uc *Codec) Decode(data []byte) (any, error) { + var ( + c Message + err error + ) + err = json.Unmarshal(data, &c) + if err != nil { + return nil, fmt.Errorf("ошибка десериализации: %v", err) + } + return &c, nil +} diff --git a/module_2/goka_golang/project/readme.md b/module_2/goka_golang/project/readme.md new file mode 100644 index 0000000..b2fa4b0 --- /dev/null +++ b/module_2/goka_golang/project/readme.md @@ -0,0 +1,93 @@ +## Проект Kafka Avro на Go + +Этот проект демонстрирует систему обработки потоков сообщений с функциональностью +блокировки пользователей и цензуры сообщений. + + +### Установка + +1. Установите Go с официального сайта: https://go.dev/doc/install + +2. Установите требуемые зависимости: +```bash +go mod tidy +``` + + +### Запуск +1. Запустите инфраструктуру +```bash +cd ./infra && docker compose up -d +``` + +2. Создайте топики: + +Создание топика для отправки сообщения +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic emitter2filter-stream --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 +``` +Создание топика отслеживающего изменения в групповой таблицы сообщений. +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic user-message-table --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact +``` + +Создание топика для передачи данных от фильтра в обработчик пользовательских сообщений +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic filter2userprocessor-stream --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 +``` +Создание топика отслеживающего изменения в групповой таблицы фильтра. +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic filter-table --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact +``` + + +Создание топика для передачи данных заблокированных пользователей. +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic blocker-stream --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact +``` + +Создание топика отслеживающего изменения в групповой таблицы заблокированных пользователей. +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic blocker-table --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact +``` + +Создание топика для передачи данных о зацензуринных словах. +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic censor-stream --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact +``` + +Создание топика отслеживающего изменения в групповой таблицы о зацензуринных словах. +```bash +docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic censor-table --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact +``` + +3. Запустите программу: +```bash +go run main.go +``` + +Сообщения будут авторматически отправляться, последние 5 сообщений доступны по адресам: +http://localhost:9095/Alex +http://localhost:9095/Dian +http://localhost:9095/Xenia + +Ожидаемый вывод: + +![messages.png](docs%2Fmessages.png) + +4. Цензура, заменяем "sad" на "not so happy" +```bash +go run cmd/censore/main.go -word "sad" -with "not so happy" +``` + +5. Блокировка пользователей. Для пользователя Алекс заблокировать все сообщения от Dian и Xenia +```bash +go run cmd/block-user/main.go -user Alex -stream blocker-stream -name Dian +go run cmd/block-user/main.go -user Alex -stream blocker-stream -name Xenia +``` + +6. Разблокировка пользователей. Для пользователя Алекс разблокировать все сообщения от Dian и Xenia +```bash +go run cmd/block-user/main.go -user Alex -stream blocker-stream -name Dian -block=false +go run cmd/block-user/main.go -user Alex -stream blocker-stream -name Xenia -block=false +``` diff --git a/module_2/goka_golang/project/tmp.txt b/module_2/goka_golang/project/tmp.txt deleted file mode 100644 index e69de29..0000000 diff --git a/module_2/goka_golang/project/user/user.go b/module_2/goka_golang/project/user/user.go new file mode 100644 index 0000000..90be123 --- /dev/null +++ b/module_2/goka_golang/project/user/user.go @@ -0,0 +1,110 @@ +package user + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/gorilla/mux" + "github.com/lovoo/goka" + "project/message" +) + +var ( + Group goka.Group = "user-message" +) + +type MessagesCollected struct { + Messages map[string][]string +} + +type Codec struct{} + +// Encode переводит MessagesCollected в []byte +func (up *Codec) Encode(value any) ([]byte, error) { + if _, isMessageCollected := value.(*MessagesCollected); !isMessageCollected { + return nil, fmt.Errorf("тип должен быть *MessagesCollected, получен %T", value) + } + return json.Marshal(value) +} + +// Decode переводит MessagesCollected из []byte в структуру. +func (up *Codec) Decode(data []byte) (any, error) { + var ( + p MessagesCollected + err error + ) + err = json.Unmarshal(data, &p) + if err != nil { + return nil, fmt.Errorf("ошибка десериализации: %v", err) + } + return &p, nil +} + +func process(ctx goka.Context, msg any) { + var msgRecived *message.Message + var ok bool + var msgCollected *MessagesCollected + + if msgRecived, ok = msg.(*message.Message); !ok { + return + } + + if val := ctx.Value(); val != nil { + msgCollected = val.(*MessagesCollected) + } else { + msgCollected = &MessagesCollected{Messages: make(map[string][]string)} + } + + if len(msgCollected.Messages[msgRecived.From]) < 5 { + msgCollected.Messages[msgRecived.From] = append(msgCollected.Messages[msgRecived.From], msgRecived.Content) + } else { + msgCollected.Messages[msgRecived.From] = append(msgCollected.Messages[msgRecived.From][1:5], msgRecived.Content) + } + ctx.SetValue(msgCollected) +} + +func RunUserProcessor(brokers []string, inputStream goka.Stream) { + g := goka.DefineGroup(Group, + goka.Input(inputStream, new(message.Codec), process), + goka.Persist(new(Codec)), + ) + p, err := goka.NewProcessor(brokers, g) + if err != nil { + log.Fatal(err) + } + err = p.Run(context.Background()) + if err != nil { + log.Fatal(err) + } +} + +func RunUserView(brokers []string) { + view, err := goka.NewView(brokers, + goka.GroupTable(Group), + new(Codec), + ) + if err != nil { + log.Fatal(err) + } + + root := mux.NewRouter() + root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) { + value, _ := view.Get(mux.Vars(r)["key"]) + data, _ := json.Marshal(value) + w.Write(data) + }) + log.Println("View opened at http://localhost:9095/") + go func() { + err = http.ListenAndServe(":9095", root) + if err != nil { + log.Fatal(err) + } + }() + err = view.Run(context.Background()) + if err != nil { + log.Fatal(err) + } +}