diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..ec73510 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: github-actions + directory: / + schedule: + interval: weekly + - package-ecosystem: gomod + directory: / + schedule: + interval: weekly \ No newline at end of file diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml new file mode 100644 index 0000000..beb8101 --- /dev/null +++ b/.github/workflows/codeql.yaml @@ -0,0 +1,53 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +name: "CodeQL" + +on: + push: + branches: [ main ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ main ] + schedule: + - cron: '30 1 * * 0' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + + permissions: + # required for all workflows + security-events: write + + # only required for workflows in private repositories + actions: read + contents: read + + strategy: + fail-fast: false + matrix: + # Override automatic language detection by changing the below list + # Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python'] + # TODO: Enable for javascript later + language: [ 'go'] + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..bb339bc --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,70 @@ +name: Run Testing +on: push + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Setup go + uses: actions/setup-go@v3 + with: + go-version: '^1' + - name: Checkout repository + uses: actions/checkout@v3 + - name: Setup golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: latest + args: --verbose + + # Label of the container job + test: + strategy: + matrix: + os: [ubuntu-latest] + go: [1.17, 1.18, 1.19, '1.20'] + include: + - os: ubuntu-latest + go-build: ~/.cache/go-build + name: ${{ matrix.os }} @ Go ${{ matrix.go }} + runs-on: ${{ matrix.os }} + env: + GO111MODULE: on + GOPROXY: https://proxy.golang.org + + steps: + - name: Set up Go ${{ matrix.go }} + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Checkout Code + uses: actions/checkout@v3 + with: + ref: ${{ github.ref }} + + - uses: actions/cache@v3 + with: + path: | + ${{ matrix.go-build }} + ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + + # - uses: mer-team/rabbitmq-mng-action@v1.2 + # with: + # RABBITMQ_USER: 'guest' + # RABBITMQ_PASS: 'guest' + # RABBITMQ_PORT: 5672 + # RABBITMQ_MNG_PORT: 15672 + # RABBITMQ_TAG: '3-management-alpine' + + - name: Run Tests + run: | + go test -v -covermode=atomic -coverprofile=coverage.out + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + flags: ${{ matrix.os }},go-${{ matrix.go }} diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml new file mode 100644 index 0000000..3af3a45 --- /dev/null +++ b/.github/workflows/goreleaser.yml @@ -0,0 +1,34 @@ +name: Goreleaser + +on: + push: + tags: + - '*' + +permissions: + contents: write + +jobs: + goreleaser: + runs-on: ubuntu-latest + steps: + - + name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - + name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.17 + - + name: Run GoReleaser + uses: goreleaser/goreleaser-action@v4 + with: + # either 'goreleaser' (default) or 'goreleaser-pro' + distribution: goreleaser + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/LICENSE b/LICENSE index b645a5c..716cb64 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022 golang-queue +Copyright (c) 2022 golang-queue/kafka Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index c38b226..a70c5a0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # Kafka -[Kafka](https://kafka.apache.org/) as backend for [Queue package](https://github.com/golang-queue/queue) +[Kafka](https://kafka.apache.org/) as backend for [Queue package](https://github.com/golang-queue/queue). See the [Go kafka Client Library]( github.com/segmentio/kafka-go). + + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d914e90 --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module github.com/golang-queue/kafka + +// go 1.22.0 + +go 1.18 + +// require ( +// github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 +// github.com/segmentio/kafka-go v0.4.47 +// github.com/stretchr/testify v1.8.4 +// go.uber.org/goleak v1.2.1 +// ) + +// 0c677f44188bc2c3e6a +require ( + github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b + github.com/segmentio/kafka-go v0.4.47 + go.uber.org/goleak v1.2.1 +) + +require github.com/stretchr/testify v1.8.4 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/jpillora/backoff v1.0.0 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/golang-queue/kafka => ../../ diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..37a446f --- /dev/null +++ b/go.sum @@ -0,0 +1,80 @@ +github.com/appleboy/com v0.1.7 h1:4lYTFNoMAAXGGIC8lDxVg/NY+1aXbYqfAWN05cZhd0M= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b h1:EfOci2gtTtCMgxv2Coh+i0iEARmvnCrxcY0Mm08KzMw= +github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b/go.mod h1:5nEkJTzw9Boc8ZCylQlrJK5f/Vd8Uo58yAssRli5ckg= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/images/flow01.d2 b/images/flow01.d2 new file mode 100644 index 0000000..47c991e --- /dev/null +++ b/images/flow01.d2 @@ -0,0 +1,5 @@ +donut: { shape: circle } +database.shape: cylinder +you: { + shape: person +} \ No newline at end of file diff --git a/images/flow01.svg b/images/flow01.svg new file mode 100644 index 0000000..0943ffb --- /dev/null +++ b/images/flow01.svg @@ -0,0 +1,97 @@ +donutdatabaseyou + + + + + diff --git a/kafka.go b/kafka.go new file mode 100644 index 0000000..cf9f8ef --- /dev/null +++ b/kafka.go @@ -0,0 +1,249 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" + kafkaAPI "github.com/segmentio/kafka-go" +) + +var _ core.Worker = (*Worker)(nil) + +// one consumer connect to kafka broker + +type KafkaConsumer struct { + //stopFlag int32 + opts options + reader *kafkaAPI.Reader + ring *queue.Ring +} +type ConnWaitGroup struct { + DialFunc func(context.Context, string, string) (net.Conn, error) + sync.WaitGroup +} + +// start consumer, get message from kafka +func InitConsumer(opts ...Option) *KafkaConsumer { + //var err error + tKafkaConsumer := &KafkaConsumer{ + opts: newOptions(opts...), + } + + _, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + //初始化ring + tKafkaConsumer.ring = queue.NewRing() + // 创建client,创建topic,创建shutdown + // client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, + // kafkaConsumer.opts.partition) + reader := kafkaAPI.NewReader(kafkaAPI.ReaderConfig{ + Brokers: []string{fmt.Sprintf("%s:9092", tKafkaConsumer.opts.addr)}, + Topic: tKafkaConsumer.opts.topic, + MinBytes: 1, + MaxBytes: 10e6, + MaxWait: 100 * time.Millisecond, + //Logger: newTestKafkaLogger(t, ""), + }) + tKafkaConsumer.reader = reader + //kafkaConsumer.shutdown = shutdown + fmt.Printf("get data.\n") + //GetData() + fmt.Printf("shutdown now!!!\n") + // defer reader.Close() + return tKafkaConsumer +} + +// func newLocalClient(address string) (*kafkaAPI.Client, func()) { +// return newClient(kafkaAPI.TCP(address)) +// } + +// func newClient(addr net.Addr) (*kafkaAPI.Client, func()) { +// conns := &ktesting.ConnWaitGroup{ +// DialFunc: (&net.Dialer{}).DialContext, +// } + +// transport := &kafkaAPI.Transport{ +// Dial: conns.Dial, +// Resolver: kafkaAPI.NewBrokerResolver(nil), +// } + +// client := &kafkaAPI.Client{ +// Addr: addr, +// Timeout: 5 * time.Second, +// Transport: transport, +// } + +// return client, func() { transport.CloseIdleConnections(); conns.Wait() } +// } + +// func newLocalClientAndTopic(address string, topic string, partition int) (*kafkaAPI.Client, func()) { +// //topic := makeTopic() +// client, shutdown := newLocalClientWithTopic(address, topic, partition) +// return client, shutdown +// } + +// func newLocalClientWithTopic(address string, topic string, partitions int) (*kafkaAPI.Client, func()) { +// client, shutdown := newLocalClient(address) +// if err := clientCreateTopic(client, topic, partitions); err != nil { +// shutdown() +// panic(err) +// } +// return client, func() { +// client.DeleteTopics(context.Background(), &kafkaAPI.DeleteTopicsRequest{ +// Topics: []string{topic}, +// }) +// shutdown() +// } +// } + +// func clientCreateTopic(client *kafkaAPI.Client, topic string, partitions int) error { +// _, err := client.CreateTopics(context.Background(), &kafkaAPI.CreateTopicsRequest{ +// Topics: []kafkaAPI.TopicConfig{{ +// Topic: topic, +// NumPartitions: partitions, +// ReplicationFactor: 1, +// }}, +// }) +// if err != nil { +// return err +// } + +// // Topic creation seems to be asynchronous. Metadata for the topic partition +// // layout in the cluster is available in the controller before being synced +// // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" +// // when sending requests to the partition leaders. +// // +// // This loop will wait up to 2 seconds polling the cluster until no errors +// // are returned. +// for i := 0; i < 20; i++ { +// r, err := client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ +// Topic: topic, +// Partition: 0, +// Offset: 0, +// }) +// if err == nil && r.Error == nil { +// break +// } +// time.Sleep(100 * time.Millisecond) +// } + +// return nil +// } + +// 获取消息发送到队列中去 +func (kafkaConsumer *KafkaConsumer) GetData() { + for { + // select { + // case <-time.After(leftTime): + // return //context.DeadlineExceeded + // // case err := <-done: // job finish + // // return err + // // case p := <-panicChan: + // // panic(p) + // default: + // 接收消息 + fmt.Printf("start fetch data") + res, err := kafkaConsumer.reader.ReadMessage(context.Background()) + if err != nil { + //t.Fatal(err) + fmt.Printf("%v", err) + } + // 打印出消息,后续放入队列中去 + m := &job.Message{ + Timeout: 100 * time.Millisecond, + Payload: res.Value, + } + kafkaConsumer.ring.Queue(m) + fmt.Printf("%v", m) + // } + } +} + +// 这里包含了回调函数,没有线程 +type Worker struct { + // + //shutdown func() // + // stop chan struct{} + stopFlag int32 + // stopOnce sync.Once + // startOnce sync.Once + opts options + conn *kafkaAPI.Conn + //ring *queue.Ring + kafkaConsumer *KafkaConsumer +} + +func NewWorker(opts ...Option) *Worker { + var err error + w := &Worker{ + opts: newOptions(opts...), + } + w.conn, err = + //conn, err := + (&kafkaAPI.Dialer{ + Resolver: &net.Resolver{}, + }).DialLeader(context.Background(), w.opts.network, + w.opts.addr, w.opts.topic, 0) + if err != nil { + w.opts.logger.Fatal("can't connect kafka: ", err) + } + // 启动kakfaConsumer + w.kafkaConsumer = InitConsumer(opts...) + // 开始启动协程,获取数据 + go w.kafkaConsumer.GetData() + return w +} + +// Run start the worker +func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error { + return w.opts.runFunc(ctx, task) +} + +// Shutdown worker +func (w *Worker) Shutdown() (err error) { + // 关闭fakfa的连接 + + w.kafkaConsumer.ring.Shutdown() + return err +} + +// Queue send notification to queue +func (w *Worker) Queue(job core.QueuedMessage) (err error) { + //err := nil + if atomic.LoadInt32(&w.stopFlag) == 1 { + return queue.ErrQueueShutdown + } + // send message + base := time.Now() + + msg := kafkaAPI.Message{ + Time: base.Truncate(time.Millisecond), + Value: job.Bytes(), + } + if w.opts.compression == nil { + _, err = w.conn.WriteMessages(msg) + } else { + _, err = w.conn.WriteCompressedMessages(w.opts.compression, msg) + } + // if err != nil { + // t.Fatal(err) + // } + return err +} + +// get data from ring +// 这个函数是倍queue.go中的coroutine调用的回调函数 +func (w *Worker) Request() (core.QueuedMessage, error) { + //_ = w.startConsumer() + //从ring中获取数据 + return w.kafkaConsumer.ring.Request() + //return nil, queue.ErrNoTaskInQueue +} diff --git a/kafka_test.go b/kafka_test.go new file mode 100644 index 0000000..1453207 --- /dev/null +++ b/kafka_test.go @@ -0,0 +1,55 @@ +package kafka + +import ( + "testing" + + "github.com/golang-queue/queue" + "github.com/stretchr/testify/assert" + //"github.com/stretchr/testify/assert" +) + +// func TestFetchData(t *testing.T) { +// // m := mockMessage{ +// // Message: "foo", +// // } +// // w := NewWorker() +// // q, err := queue.NewQueue( +// // queue.WithWorker(w), +// // queue.WithWorkerCount(2), +// // ) +// // assert.NoError(t, err) +// // q.Start() +// // time.Sleep(50 * time.Millisecond) +// // q.Shutdown() +// // // can't queue task after shutdown +// // err = q.Queue(m) +// // assert.Error(t, err) +// // assert.Equal(t, queue.ErrQueueShutdown, err) +// // q.Wait() +// fmt.Printf("start\n") +// InitConsumer(WithAddr("localhost"), +// WithPartition(1), +// WithTopic("hello")) +// fmt.Printf("end\n") +// } + +func TestNewWork(t *testing.T) { + w := + NewWorker( + WithAddr("localhost"), + WithNetwork("tcp"), + WithPartition(1), + WithTopic("hello"), + ) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(1), + ) + assert.NoError(t, err) + q.Start() + // time.Sleep(100 * time.Millisecond) + // //assert.Equal(t, 1, int(q.metric.BusyWorkers())) + // time.Sleep(600 * time.Millisecond) + // q.Shutdown() + // q.Wait() +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..90cc8be --- /dev/null +++ b/options.go @@ -0,0 +1,89 @@ +package kafka + +import ( + "context" + + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" + "github.com/segmentio/kafka-go/compress" +) + +// Option for queue system +type Option func(*options) + +type options struct { + runFunc func(context.Context, core.QueuedMessage) error + logger queue.Logger + addr string + network string + queue string + topic string + partition int //kafka's partition + compression compress.Codec +} + +// WithAddr setup the URI +func WithAddr(addr string) Option { + return func(w *options) { + w.addr = addr + } +} + +func WithNetwork(network string) Option { + return func(w *options) { + w.network = network + } +} + +// WithTopic setup the Topic +func WithTopic(topic string) Option { + return func(w *options) { + w.topic = topic + } +} + +// WithPartition setup the partition +func WithPartition(partition int) Option { + return func(w *options) { + w.partition = partition + } +} + +// WithQueue setup the queue name +func WithQueue(val string) Option { + return func(w *options) { + w.queue = val + } +} + +// WithAddr setup the URI +func WithCompress(compress compress.Codec) Option { + return func(w *options) { + w.compression = compress + } +} + +// WithRunFunc setup the run func of queue +func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option { + return func(w *options) { + w.runFunc = fn + } +} + +// WithLogger set custom logger +func WithLogger(l queue.Logger) Option { + return func(w *options) { + w.logger = l + } +} + +func newOptions(opts ...Option) options { + defaultOpts := options{} + + for _, opt := range opts { + // Call the option giving the instantiated + opt(&defaultOpts) + } + + return defaultOpts +} diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..55cb0a3 --- /dev/null +++ b/options_test.go @@ -0,0 +1,36 @@ +package kafka + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +// type mockMessage struct { +// Message string +// } + +// func (m mockMessage) Bytes() []byte { +// return []byte(m.Message) +// } + +// func TestShutdownWorkFlow(t *testing.T) { +// w := NewWorker( +// WithQueue("test"), +// ) +// q, err := queue.NewQueue( +// queue.WithWorker(w), +// queue.WithWorkerCount(2), +// ) +// assert.NoError(t, err) +// q.Start() +// time.Sleep(1 * time.Second) +// q.Shutdown() +// // check shutdown once +// q.Shutdown() +// q.Wait() +// }