Skip to content

Commit

Permalink
squash commit
Browse files Browse the repository at this point in the history
  • Loading branch information
AdeleDev committed Nov 21, 2024
1 parent 26576af commit f5c2c1a
Show file tree
Hide file tree
Showing 43 changed files with 2,630 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# macOS
.DS_Store

26 changes: 26 additions & 0 deletions module_1/go/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

# Go workspace file
go.work
go.work.sum

# env file
.env
.idea
77 changes: 77 additions & 0 deletions module_1/go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## Проект Kafka Avro на Go

Этот проект демонстрирует работу с Kafka и Avro на языке Go.

### Описание

Код проекта можно скомпилировать в два независимо запускаемых приложений:

* Продьюсер (Producer): Записывает данные в формате Avro в топик Kafka.
* Консьюмер (Consumer): Считывает данные из топика Kafka и выводит их в консоль.

### Установка

1. Установите Go с официального сайта: https://go.dev/doc/install

2. Установите требуемые зависимости:
```bash
go mod tidy
```

3. Настройте конфигурационные файлы (можно оставить без изменений):
* `config/имя.yaml`: файл конфигурации для подключения к Kafka и установки параметров.

### Запуск
1. Запустите инфраструктуру
```bash
cd ./infra && docker compose up -d
```

2. Создайте топик и проверьте его конфигурацию:

```bash
docker exec -it kafka-1 ../../usr/bin/kafka-topics --create --topic users --bootstrap-server kafka-1:9092 --partitions 3 --replication-factor 2
```
Параметры `localhost:9092` - адрес Kafka-брокера, `users` - имя топика, `3` - количество партиций, `2` - количество реплик.

```bash
docker exec -it kafka-1 ../../usr/bin/kafka-topics --describe --topic users --bootstrap-server localhost:9092
```
Ожидаемый результат:
```
Topic: users TopicId: YEE8ywCDR2mWWht0TNlZsw PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: users Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: users Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: users Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
```

3. Запустите Producer:
```bash
go run ./avro-example/cmd/main.go -c ./avro-example/config/producer.yaml
```
а. Программа у вас запросит требуемое действие (Command):
введите `send`, чтобы отправить сообщение в брокер
введите `exit`, чтобы выйти

б. Программа у вас запросит требуемые данные для отправки - имя (Enter name:),
любимое число (Enter favorite number:), любимый цвет (Enter favorite color:). Пожалуйста,
заполняйте правильно вводимые значения, так как основная задача показать возможность передачи
данных в формате Avro, используя Kafka, валидация данных не сделана.

Пример:
```
Command:send
Enter name: alex
Enter favorite number: 55
Enter favorite color:black
```


4. Запустите consumer-push:
```bash
go run ./avro-example/cmd/main.go -c ./avro-example/config/consumer1.yaml
```
5. Запустите consumer-pull:
```bash
go run ./avro-example/cmd/main.go -c ./avro-example/config/consumer2.yaml
```
168 changes: 168 additions & 0 deletions module_1/go/avro-example/.golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
run:
tests: true

issues:
max-same-issues: 0

linters-settings:
exhaustive:
default-signifies-exhaustive: true

depguard:
rules:
main:
deny:
- pkg: "github.com/pkg/errors"
desc: Should be replaced by standard lib errors package

godox:
keywords:
- FIXME
- TODO

govet:
enable-all: true
disable:
- fieldalignment
- shadow

lll:
line-length: 120

nolintlint:
allow-no-explanation: [ "lll" ]
require-explanation: true

revive:
severity: error
rules:
- name: argument-limit
- name: atomic
- name: bare-return
- name: blank-imports
- name: bool-literal-in-expr
- name: comment-spacings
- name: confusing-results
- name: context-as-argument
arguments:
- allowTypesBefore: "*testing.T" # https://go-review.googlesource.com/c/lint/+/145237
- name: context-keys-type
- name: datarace
- name: deep-exit
- name: defer
- name: dot-imports
- name: duplicated-imports
- name: early-return
- name: empty-lines
- name: empty-block
- name: error-naming
- name: error-return
- name: error-strings
- name: errorf
- name: exported
- name: identical-branches
- name: if-return
- name: increment-decrement
- name: indent-error-flow
- name: package-comments
- name: range
- name: range-val-address
- name: range-val-in-closure
- name: receiver-naming
- name: redefines-builtin-id
- name: string-of-int
- name: superfluous-else
- name: time-equal
- name: time-naming
- name: unexported-return
- name: unhandled-error
arguments: [ "fmt.Fprint", "fmt.Printf", "fmt.Println" ]
- name: unreachable-code
- name: use-any
- name: unused-parameter
- name: var-declaration
- name: var-naming
- name: waitgroup-by-value

tagliatelle:
# Check the struck tag name case.
case:
# Use the struct field name to check the name of the struct tag.
# Default: false
use-field-name: false
rules:
# Any struct tag type can be used.
# Support string case: `camel`, `pascal`, `kebab`, `snake`, `upperSnake`, `goCamel`, `goPascal`, `goKebab`, `goSnake`, `upper`, `lower`, `header`.
json: camel
toml: snake

linters:
disable-all: true
enable:
- asasalint
- asciicheck
- bidichk
- bodyclose
- contextcheck
- depguard
- durationcheck
- errcheck
- errchkjson
- errname
- exhaustive
- exportloopref
- ginkgolinter
- goconst
- gocritic
- gocyclo
- godot
- godox
- gofmt
- gofumpt
- goheader
- goimports
- gomoddirectives
- gomodguard
- goprintffuncname
- gosec
- gosimple
- govet
- importas
- ineffassign
- inamedparam
- lll
- makezero
- misspell
- musttag
- nakedret
- nestif
- nilerr
- nilnil
- noctx
- nolintlint
- nosprintfhostport
- perfsprint
- prealloc
- predeclared
- promlinter
- reassign
- revive
- rowserrcheck
- sloglint
- staticcheck
- stylecheck
- sqlclosecheck
- tagliatelle
- tenv
- testableexamples
- testifylint
- testpackage
- thelper
- tparallel
- typecheck
- unconvert
- unparam
- unused
- usestdlibvars
- wastedassign
- whitespace
37 changes: 37 additions & 0 deletions module_1/go/avro-example/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package app

import (
"context"
"errors"

"github.com/apache_kafka_course/module1/go/avro-example/app/consumer"
"github.com/apache_kafka_course/module1/go/avro-example/app/producer"
"github.com/apache_kafka_course/module1/go/avro-example/internal/config"
"github.com/apache_kafka_course/module1/go/avro-example/internal/logger"
)

var ErrWrongType = errors.New("wrong type")

type StartGetConfigStopper interface {
Start(ctx context.Context)
GetConfig() string
Stop()
}

func Fabric() (StartGetConfigStopper, error) {
cfg, err := config.New()
if err != nil {
return nil, err
}
log := logger.New(cfg.Env)
switch cfg.Kafka.Type {
case "producer":
return producer.New(cfg, log)
case "consumer-push":
return consumer.New(cfg, log)
case "consumer-pull":
return consumer.New(cfg, log)
default:
return nil, ErrWrongType
}
}
73 changes: 73 additions & 0 deletions module_1/go/avro-example/app/consumer/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package consumer

import (
"context"
"log/slog"
"time"

consumerpull "github.com/apache_kafka_course/module1/go/avro-example/internal/broker/consumer/pull"
consumerpush "github.com/apache_kafka_course/module1/go/avro-example/internal/broker/consumer/push"
"github.com/apache_kafka_course/module1/go/avro-example/internal/config"
)

type consumeCloser interface {
Consume() error
Close() error
}

type App struct {
ServerConsumer consumeCloser
log *slog.Logger
Cfg *config.Config
}

func New(cfg *config.Config, log *slog.Logger) (*App, error) {
if cfg.Kafka.Type == "consumer-push" {
cons, err := consumerpush.New(cfg, log)
if err != nil {
return nil, err
}
return &App{
ServerConsumer: cons,
log: log,
Cfg: cfg,
}, nil
}
cons, err := consumerpull.New(cfg, log)
if err != nil {
return nil, err
}
return &App{
ServerConsumer: cons,
log: log,
Cfg: cfg,
}, nil
}

func (a *App) Start(ctx context.Context) {
a.log.Info("producer starts")
for {
select {
case <-ctx.Done():
return
default:
err := a.ServerConsumer.Consume()
if err != nil {
a.log.Error(err.Error())
}
time.Sleep(time.Second)
}
}
}

func (a *App) Stop() {
a.log.Info("close kafka client")
err := a.ServerConsumer.Close()
if err != nil {
a.log.Error(err.Error())
}
}

func (a *App) GetConfig() string {
return a.Cfg.String()
}
Loading

0 comments on commit f5c2c1a

Please sign in to comment.