Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
add wait step for kafka and zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
rogpeppe committed Feb 12, 2020
1 parent 2e439d7 commit e27d886
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 131 deletions.
26 changes: 25 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,39 @@ jobs:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
image: confluentinc/cp-kafka:latest
ports:
- 9092:9092
zookeeper:
env:
ZOOKEEPER_CLIENT_PORT: "2181"
ZOOKEEPER_TICK_TIME: "2000"
image: confluentinc/cp-zookeeper:latest
ports:
- 2181:2181
steps:
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: ${{ matrix.go-version }}
- name: Checkout code
uses: actions/checkout@v1
- name: Wait for Kafka
run: "netstat -a || true\nwaitfor() {\n\twhile ! nc -v -z $1 $2\n\tdo sleep
1\n\tdone\n}\nwaitfor localhost 9092\nwaitfor localhost 2181\n"
shell: bash
timeout-minutes: 1
- name: Test
run: go test ./...
run: "nc -v -z localhost 9092\nnetstat -a || true\necho 'package main\nimport
(\n\t\"fmt\"\n\t\"net\"\n\t\"os\"\n)\n\nfunc main() {\n\t_, err := net.Dial(\"tcp\",
os.Args[1])\n\tfmt.Printf(\"dial %s: %v\\n\", os.Args[1], err)\n\tif err !=
nil {\n\t\tos.Exit(1)\n\t}\n}\n' > /tmp/dial.go\ngo build -o /tmp/dial /tmp/dial.go\n/tmp/dial
localhost:9092 || true\n\nmkdir /tmp/m\ncp go.mod client.go /tmp/m\necho '\npackage
kafkatest_test\n\nimport (\n\t_ \"log\"\n\t\"net\"\n\t\"os\"\n\t\"testing\"\n\t_
\"time\"\n\n\t_ \"github.com/Shopify/sarama\"\n\t_ \"github.com/frankban/quicktest\"\n\n\t_
\"github.com/heetch/kafkatest\"\n)\n\nfunc TestFoo(t *testing.T) {\n\t_, err
:= net.Dial(\"tcp\", os.Getenv(\"KAFKA_ADDRS\"))\n\tif err != nil {\n\t\tt.Errorf(\"cannot
dial: %v\", err)\n\t}\n}\n' > /tmp/m/client_test.go\n(\n\tcd /tmp/m\n\tgo
test\n)\nKAFKA_ADDRS=localhost:9092 go test ./...\n"
strategy:
matrix:
go-version:
Expand Down
6 changes: 5 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package kafkatest
import (
"crypto/rand"
"fmt"
"log"
"net"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -54,6 +56,8 @@ func New() (*Kafka, error) {
if addrsStr == "" {
addrsStr = "localhost:9092"
}
_, err = net.Dial("tcp", addrsStr)
log.Printf("net.Dial %q: %v", addrsStr, err)
addrs := strings.Split(addrsStr, ",")
useTLS, err := boolVar("KAFKA_USE_TLS")
if err != nil {
Expand Down Expand Up @@ -84,7 +88,7 @@ func New() (*Kafka, error) {
client.admin = admin
break
}
if !a.Next() {
if !a.More() {
return nil, fmt.Errorf("cannot connect to Kafka cluster at %q after %v: %v", addrs, retryLimit, err)
}
}
Expand Down
7 changes: 6 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafkatest_test

import (
"log"
"testing"
"time"

Expand All @@ -11,17 +12,21 @@ import (
)

func TestNew(t *testing.T) {
//sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)

c := qt.New(t)
k, err := kafkatest.New()
c.Assert(err, qt.Equals, nil)

log.Printf("************** succeeded in making initial connection")

// Produce a message to a new topic.
cfg := k.Config()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true

producer, err := sarama.NewSyncProducer(k.Addrs(), cfg)
c.Assert(err, qt.Equals, nil)
c.Assert(err, qt.Equals, nil, qt.Commentf("addrs: %q", k.Addrs()))
defer producer.Close()
topic := k.NewTopic()

Expand Down

This file was deleted.

This file was deleted.

122 changes: 84 additions & 38 deletions cue.mod/pkg/github.com/heetch/cue-schema/github/workflow/go/go.cue
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// for easy modification of some parameters.
package workflow

import "list"

on: _ | *["push", "pull_request"]
name: _ | *"Test"
jobs: test: {
Expand All @@ -14,34 +16,35 @@ jobs: test: {
platform: _ | *[ "\(p)-latest" for p in Platforms ]
}
"runs-on": "${{ matrix.platform }}"
steps: [{
steps: list.FlattenN([{
name: "Install Go"
uses: "actions/setup-go@v1"
with: "go-version": "${{ matrix.go-version }}"
}, {
},
// {
// name: "Module cache"
// uses: "actions/cache@v1"
// with: {
// path: "~/go/pkg/mod"
// key: "${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}"
// "restore-keys": "${{ runner.os }}-go-"
// }
// },
{
name: "Checkout code"
uses: "actions/checkout@v1"
}, _ | *{
},
// Include setup steps for any services that require them,
[ServiceConfig[name].SetupStep for name, enabled in Services if enabled && ServiceConfig[name].SetupStep != null],
_ | *{
name: "Test"
run: RunTest
}]
}], 1)
}

// Include all named services.
for name in Services {
jobs: test: services: "\(name)": ServiceConfig[name]
}

jobs: test: services: kafka?: _ | *{
image: "confluentinc/cp-kafka:latest"
env: {
KAFKA_BROKER_ID: "1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
}
for name, enabled in Services if enabled {
jobs: test: services: "\(name)": ServiceConfig[name].Service
}

// Platforms configures what platforms to run the tests on.
Expand All @@ -57,33 +60,76 @@ RunTest :: *"go test ./..." | string
// Service configures which services to make available.
// The configuration the service with name N is taken from
// ServiceConfig[N]
Services :: [... string]
Services :: [_]: bool

// ServiceConfig holds the default configuration for services that
// can be started by naming them in Services.
ServiceConfig :: [_]: _
ServiceConfig :: [_]: {
// Service holds the contents of `jobs: test: services: "\(serviceName)"`
"Service": Service

// SetupStep optionally holds a step to run to set up the service
// before the main workflow action is run (for example to wait
// for the service to become ready).
SetupStep: JobStep | *null
}

// Kafka requires zookeeper too.
if Services["kafka"] != _|_ {
Services :: zookeeper: true
}

ServiceConfig :: kafka: {
image: "confluentinc/cp-kafka:latest"
env: {
KAFKA_BROKER_ID: "1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
Service: {
image: "confluentinc/cp-kafka:latest"
ports: ["9092:9092"]
env: {
KAFKA_BROKER_ID: "1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
}
}
SetupStep: {
name: "Wait for Kafka"
"timeout-minutes": 1
shell: "bash"
run: #"""
netstat -a || true
waitfor() {
while ! nc -v -z $1 $2
do sleep 1
done
}
waitfor localhost 9092
waitfor localhost 2181
"""#
}
}

ServiceConfig :: zookeeper: {
Service: {
image: "confluentinc/cp-zookeeper:latest"
ports: ["2181:2181"]
env: {
ZOOKEEPER_CLIENT_PORT: "2181"
ZOOKEEPER_TICK_TIME: "2000"
}
}
}

ServiceConfig :: postgres: {
env: {
POSTGRES_DB: "postgres"
POSTGRES_PASSWORD: "postgres"
POSTGRES_USER: "postgres"
ServiceConfig :: postgres: _ |*{
Service: {
image: "postgres:10.8"
ports: ["5432:5432"]
env: {
POSTGRES_DB: "postgres"
POSTGRES_PASSWORD: "postgres"
POSTGRES_USER: "postgres"
}
options: "--health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5"
}
image: "postgres:10.8"
options: "--health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5"
ports: [
"5432:5432",
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package workflow

// TODO cross-verify against https://github.com/SchemaStore/schemastore/blob/master/src/schemas/json/github-workflow.json

import "regexp"

// name holds the name of your workflow. GitHub displays the
// names of your workflows on your repository's actions page. If
// you omit this field, GitHub sets the name to the workflow's
Expand Down Expand Up @@ -74,7 +72,7 @@ JobStep :: {
"sh" |
"cmd" |
"powershell" |
regexp.Match(#"\{0\}"#)
=~#"\{0\}"#

// with holds a map of the input parameters defined by the action.
// Each input parameter is a key/value pair. Input parameters are
Expand Down
Loading

0 comments on commit e27d886

Please sign in to comment.