Skip to content

Commit

Permalink
Add Apache Kafka consumer/producer (#14)
Browse files Browse the repository at this point in the history
* Add consumer/producer/airports workload

* fix(circleci): bump executor image

* fix(Makefile): go get -> go install

* Version 3 implementation

* Remove unused Kafka workload

Co-authored-by: Peter Szabo <[email protected]>
  • Loading branch information
hi-im-aren and asdwsda authored Jul 7, 2022
1 parent 8d9b133 commit ba19a6d
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 114 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ orbs:
jobs:
build:
docker:
- image: circleci/golang:1.16
- image: cimg/go:1.18
environment:
GOFLAGS: -mod=readonly
steps:
Expand All @@ -33,7 +33,7 @@ jobs:
name: Save Go module cache
key: gomod-v1-{{ .Branch }}-{{ checksum "go.sum" }}
paths:
- /go/pkg/mod
- /home/circleci/go/pkg/mod

- restore_cache:
name: Restore license cache
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ check: test-all lint ## Run tests and linters

bin/go-junit-report:
@mkdir -p bin
GOBIN=${PWD}/bin/ go get -u github.com/jstemmer/go-junit-report
GOBIN=${PWD}/bin/ go install github.com/jstemmer/go-junit-report

TEST_PKGS ?= ./...
TEST_REPORT_NAME ?= results.xml
Expand Down
52 changes: 51 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,61 @@ Simply echos back a set string

Available options:

- EHCO_STR - string to echo back
- ECHO_STR - string to echo back


### Subsequent requests

Subsequent request URLs can be set using the `REQUESTS` environment variable. Multiple URLs can be set and must be separated by `space`. A `count` must also be set for each URL using the following syntax: `URL#count`.

### Apache Kafka

Allspark can be used as an Apache Kafka consumer or producer.
A single instance can work as both at the same time.

#### KafkaServer
The Kafka server is a consumer that triggers `REQUESTS` when a message is consumed from the topic specified with the below option.
Available options:
- `KAFKASERVER_BOOTSTRAP_SERVER`

The kafka bootstrap server where the cluster can be reached.
Required.

e.g.
```yaml
name: KAFKASERVER_BOOTSTRAP_SERVER
value: "kafka-all-broker.kafka.svc.cluster.local:29092"
```
- `KAFKASERVER_TOPIC`

Starts a kafka consumer for the topic passed in as value.

e.g.
```yaml
name: KAFKASERVER_CONSUMER
value: "example-topic"
```
- `KAFKASERVER_CONSUMER_GROUP`

Sets the consumer group id for the kafka consumer.

If not set it gets defaulted to `allspark-consumer-group`.

e.g.
```yaml
name: KAFKASERVER_CONSUMER_GROUP
value: "example-group"
```

#### Requests
You can use the `REQUESTS` variable to set additional consumers and producers

e.g.
```yaml
name: REQUESTS
value: kafka-consume://kafka-all-broker.kafka:29092/example-topic?consumerGroup=allspark-consumer-group kafka-produce://kafka-all-broker.kafka:29092/example-topic?message=example-message#1
```

### Example deployment

```yaml
Expand Down Expand Up @@ -127,3 +176,4 @@ spec:
selector:
app: analytics
```

10 changes: 10 additions & 0 deletions cmd/allspark/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"emperror.dev/errors"
"github.com/banzaicloud/allspark/internal/kafka"
"github.com/spf13/pflag"
"github.com/spf13/viper"

Expand Down Expand Up @@ -57,6 +58,9 @@ type Config struct {

// TCP server configuration
TCPServer tcpserver.Config `mapstructure:"tcpServer"`

// Kafka server consumer configurations
KafkaServer kafka.Consumer `mapstructure:"kafkaServer"`
}

// Validate validates the configuration
Expand Down Expand Up @@ -91,6 +95,12 @@ func (c Config) Validate() (Config, error) {
}
c.TCPServer = tcpServerConfig

kafkaServerConfig, err := c.KafkaServer.Validate()
if err != nil {
return c, errors.WrapIf(err, "could not validate kafka server config")
}
c.KafkaServer = *kafkaServerConfig

return c, nil
}

Expand Down
26 changes: 26 additions & 0 deletions cmd/allspark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"emperror.dev/emperror"
"emperror.dev/errors"
"github.com/banzaicloud/allspark/internal/kafka"
"github.com/banzaicloud/allspark/internal/kafka/server"
"github.com/spf13/pflag"
"github.com/spf13/viper"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -113,6 +115,7 @@ func main() {
}
wl = workload.NewPIWorkload(uint(count), logger)
}

var wg sync.WaitGroup

// HTTP server
Expand Down Expand Up @@ -181,5 +184,28 @@ func main() {
srv.Run()
}()

// Kafka server
wg.Add(1)
go func() {
defer wg.Done()
consumer := kafka.NewConsumer(configuration.KafkaServer.BootstrapServer, configuration.KafkaServer.Topic, configuration.KafkaServer.ConsumerGroup, logger)
srv := server.New(consumer, logger, errorHandler)
if wl != nil {
srv.SetWorkload(wl)
}

kafkaRequests, err := request.CreateRequestsFromStringSlice(viper.GetStringSlice("kafkaRequests"), logger.WithField("server", "kafka"))
if err != nil {
panic(err)
}
if len(kafkaRequests) == 0 {
kafkaRequests = requests
}

srv.SetRequests(kafkaRequests)
srv.SetSQLClient(sqlClient)
srv.Run()
}()

wg.Wait()
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v4 v4.15.0
github.com/pkg/errors v0.9.1
github.com/segmentio/kafka-go v0.4.31
github.com/sirupsen/logrus v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
Expand All @@ -35,13 +36,16 @@ require (
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.10.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jstemmer/go-junit-report v1.0.0 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
Expand Down
Loading

0 comments on commit ba19a6d

Please sign in to comment.