Skip to content

Commit

Permalink
add project
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexBlackNn committed Dec 9, 2024
1 parent 375bc89 commit e289fa0
Show file tree
Hide file tree
Showing 19 changed files with 882 additions and 0 deletions.
8 changes: 8 additions & 0 deletions module_2/goka_golang/project/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions module_2/goka_golang/project/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions module_2/goka_golang/project/.idea/project.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions module_2/goka_golang/project/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions module_2/goka_golang/project/blocker/blocker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package blocker

import (
"context"
"encoding/json"
"log"

"github.com/lovoo/goka"
)

var (
Group goka.Group = "blocker"
)

type BlockEvent struct {
Block bool
Name string
}

type BlockEventCodec struct{}

func (c *BlockEventCodec) Encode(value interface{}) ([]byte, error) {
return json.Marshal(value)
}

func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
var m BlockEvent
return &m, json.Unmarshal(data, &m)
}

type BlockValue struct {
Blocked map[string]bool
}
type BlockValueCodec struct{}

func (c *BlockValueCodec) Encode(value interface{}) ([]byte, error) {
return json.Marshal(value)
}

func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
var m BlockValue
return &m, json.Unmarshal(data, &m)
}

func block(ctx goka.Context, msg interface{}) {
var s *BlockValue
if v := ctx.Value(); v == nil {
s = &BlockValue{make(map[string]bool)}
} else {
s = v.(*BlockValue)
}
msgBlockEvent, ok := msg.(*BlockEvent)
if !ok {
return
}
s.Blocked[msgBlockEvent.Name] = msgBlockEvent.Block
ctx.SetValue(s)
}

func RunBlocker(brokers []string, inputTopic goka.Stream) {
g := goka.DefineGroup(Group,
goka.Input(inputTopic, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
p, err := goka.NewProcessor(brokers, g)

if err != nil {
log.Fatal(err)
}
err = p.Run(context.Background())
if err != nil {
log.Fatal(err)
}
}
36 changes: 36 additions & 0 deletions module_2/goka_golang/project/censure/censure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package censure

import (
"context"
"log"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
Group goka.Group = "censor"
)

type ValueCodec struct {
codec.String
}

func replaceWord(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg.(string))
}

func RunCensore(broker []string, inputStream goka.Stream) {
g := goka.DefineGroup(Group,
goka.Input(inputStream, new(ValueCodec), replaceWord),
goka.Persist(new(ValueCodec)),
)
p, err := goka.NewProcessor(broker, g)
if err != nil {
log.Fatal(err)
}
err = p.Run(context.Background())
if err != nil {
log.Fatal(err)
}
}
33 changes: 33 additions & 0 deletions module_2/goka_golang/project/cmd/block-user/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"flag"
"log"

"github.com/lovoo/goka"
"project/blocker"
)

var (
user = flag.String("user", "", "user to block")
block = flag.Bool("block", true, "block user")
name = flag.String("name", "", "name of user to block")
broker = flag.String("broker", "localhost:29092", "boostrap Kafka broker")
stream = flag.String("stream", "", "stream name")
)

func main() {
flag.Parse()
if *user == "" {
log.Fatal("невозможно заблокировать пользователя ''")
}
emitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(*stream), new(blocker.BlockEventCodec))
if err != nil {
log.Fatal(err)
}
defer emitter.Finish()
err = emitter.EmitSync(*user, &blocker.BlockEvent{Block: *block, Name: *name})
if err != nil {
log.Fatal(err)
}
}
32 changes: 32 additions & 0 deletions module_2/goka_golang/project/cmd/censore/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"flag"
"log"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
word = flag.String("word", "", "word censored")
with = flag.String("with", "", "new word")
broker = flag.String("broker", "localhost:29092", "boostrap Kafka broker")
)

func main() {
flag.Parse()
if *word == "" {
log.Fatalln("cannot censor word ''")
}
emitter, err := goka.NewEmitter([]string{*broker}, "censor-stream", new(codec.String))
if err != nil {
panic(err)
}
defer emitter.Finish()

err = emitter.EmitSync(*word, *with)
if err != nil {
panic(err)
}
}
Binary file added module_2/goka_golang/project/docs/messages.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
58 changes: 58 additions & 0 deletions module_2/goka_golang/project/emitter/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package emitter

import (
"log"
"math/rand"
"time"

"github.com/lovoo/goka"
"project/message"
)

const sendTimeInterval = time.Second

var (
users = []string{
"Alex",
"Dian",
"Xenia",
}

contents = []string{
"Hi how are you doing",
"Hello let's have lunch together",
"i'm sad",
"i'm happy",
}
)

func RunEmitter(brokers []string, outputTopic goka.Stream) {
var emitter, err = goka.NewEmitter(brokers, outputTopic, new(message.Codec))
if err != nil {
log.Fatal(err)
}
defer emitter.Finish()

t := time.NewTicker(sendTimeInterval)
defer t.Stop()

for range t.C {
sender := users[rand.Intn(len(users))]
receiver := users[rand.Intn(len(users))]
for receiver == sender {
sender = users[rand.Intn(len(users))]
}
content := contents[rand.Intn(len(contents))]

fakeUserMessage := &message.Message{
From: sender,
To: receiver,
Content: content,
}

err = emitter.EmitSync(receiver, fakeUserMessage)
if err != nil {
log.Fatal(err)
}
}
}
63 changes: 63 additions & 0 deletions module_2/goka_golang/project/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package filter

import (
"context"
"log"
"strings"

"github.com/lovoo/goka"
"project/blocker"
"project/censure"
"project/message"
)

var (
filterGroup goka.Group = "filter"
)

func shouldDrop(ctx goka.Context, msg interface{}) bool {
v := ctx.Join(goka.GroupTable(blocker.Group))
msgSend, ok := msg.(*message.Message)
if !ok {
return false
}
return v != nil && v.(*blocker.BlockValue).Blocked[msgSend.From]
}

func censor(ctx goka.Context, m *message.Message) *message.Message {
words := strings.Split(m.Content, " ")
for i, w := range words {
if tw := ctx.Lookup(goka.GroupTable(censure.Group), w); tw != nil {
words[i] = tw.(string)
}
}
return &message.Message{
From: m.From,
To: m.To,
Content: strings.Join(words, " "),
}
}

func RunFilter(brokers []string, inputTopic goka.Stream, outputTopic goka.Stream) {
g := goka.DefineGroup(filterGroup,
goka.Input(inputTopic, new(message.Codec), func(ctx goka.Context, msg interface{}) {
if shouldDrop(ctx, msg) {
return
}
m := censor(ctx, msg.(*message.Message))
ctx.Emit(outputTopic, ctx.Key(), m)
}),
goka.Output(outputTopic, new(message.Codec)),
goka.Join(goka.GroupTable(blocker.Group), new(blocker.BlockValueCodec)),
goka.Lookup(goka.GroupTable(censure.Group), new(censure.ValueCodec)),
)

p, err := goka.NewProcessor(brokers, g)
if err != nil {
log.Fatal(err)
}
err = p.Run(context.Background())
if err != nil {
log.Fatal(err)
}
}
34 changes: 34 additions & 0 deletions module_2/goka_golang/project/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module project

go 1.23.4

require (
github.com/gorilla/mux v1.8.1
github.com/lovoo/goka v1.1.12
)

require (
github.com/IBM/sarama v1.41.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.4.0 // indirect
)
Loading

0 comments on commit e289fa0

Please sign in to comment.