diff --git a/README.md b/README.md index e4841fc3..e76dda57 100644 --- a/README.md +++ b/README.md @@ -379,13 +379,27 @@ omctl push --directory ./demo_templates --url http://localhost:9998 kind: Behavior expect: kafka: - topic: hello_kafka_in + topic: hello_kafka_in_2 actions: - publish_kafka: topic: hello_kafka_out payload_from_file: './files/colors.json' # the path is relative to OPENMOCK_TEMPLATES_DIR ``` +If you started the example from docker-compose, you can test the above kafka mocks by using a kt docker container. + +```bash +# Exec into the container +docker-compose exec kt bash + +# Run some kt commands inside the container +# Notice that the container is within the docker-compose network, and it connects to "kafka:9092" + +$ kt topic +$ echo '{"123":"hi"}' | kt produce -topic hello_kafka_in -literal +$ kt consume -topic hello_kafka_out -offsets all=newest:newest +``` + ### Example: Mock AMQP (e.g. RabbitMQ) ```yaml # demo_templates/amqp.yaml diff --git a/docker-compose.yml b/docker-compose.yml index 9083e484..b2bd7cfc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,30 +1,35 @@ version: '3.4' services: zookeeper: - image: confluentinc/cp-zookeeper:3.3.2 - container_name: openmock-zookeeper + image: confluentinc/cp-zookeeper:4.1.1 # kafka 1.1.1 environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - ports: - - "2181:2181" - extra_hosts: - - "moby:127.0.0.1" + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 + - KAFKA_HEAP_OPTS=-Xmx256m -Xms256m + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc -w 2 localhost 2181"] + interval: 10s + timeout: 10s + retries: 5 kafka: - image: confluentinc/cp-kafka:3.3.2 - container_name: openmock-kafka - links: - - zookeeper + image: confluentinc/cp-kafka:4.1.1 # kafka 1.1.1 environment: - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092" - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_BROKER_ID: 1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + - KAFKA_BROKER_ID=1 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_HEAP_OPTS=-Xmx512m -Xms512m + depends_on: + - zookeeper ports: - "9092:9092" - extra_hosts: - - "moby:127.0.0.1" + healthcheck: + test: ["CMD-SHELL", "kafka-topics --zookeeper zookeeper:2181 --list"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 10s rabbitmq: image: rabbitmq:3.6.6-management @@ -58,8 +63,18 @@ services: restart: "on-failure" environment: OPENMOCK_REDIS_TYPE: "redis" + OPENMOCK_KAFKA_ENABLED: "true" + OPENMOCK_KAFKA_SEED_BROKERS: "kafka:9092" + depends_on: + - kafka volumes: - ./demo_templates:/data/templates ports: - "9999:9999" - "9998:9998" + + kt: + image: evpavel/kt + command: "sleep 1d" + environment: + KT_BROKERS: "kafka:9092" diff --git a/kafka.go b/kafka.go index 0167679f..dbf5a767 100644 --- a/kafka.go +++ b/kafka.go @@ -94,7 +94,7 @@ func (om *OpenMock) startKafka() { if err := om.configKafka(); err != nil { logrus.WithFields(logrus.Fields{ "err": err, - }).Errorf("failed to config kafka") + }).Fatal("failed to config kafka") return } for kafka, ms := range om.repo.KafkaMocks { @@ -109,7 +109,7 @@ func (om *OpenMock) startKafka() { logrus.WithFields(logrus.Fields{ "err": err, "topic": kafka.Topic, - }).Errorf("failed to create a consumer") + }).Fatal("failed to create a consumer") return } logrus.Infof("consumer started for topic:%s", kafka.Topic) diff --git a/openmock.go b/openmock.go index f145a048..df9784db 100644 --- a/openmock.go +++ b/openmock.go @@ -23,7 +23,7 @@ type OpenMock struct { AdminHTTPHost string `env:"OPENMOCK_ADMIN_HTTP_HOST" envDefault:"0.0.0.0"` KafkaEnabled bool `env:"OPENMOCK_KAFKA_ENABLED" envDefault:"false"` KafkaClientID string `env:"OPENMOCK_KAFKA_CLIENT_ID" envDefault:"openmock"` - KafkaSeedBrokers []string `env:"OPENMOCK_KAFKA_SEED_BROKERS" envDefault:"kafka:9092,localhost:9092" envSeparator:","` + KafkaSeedBrokers []string `env:"OPENMOCK_KAFKA_SEED_BROKERS" envDefault:"kafka:9092" envSeparator:","` AMQPEnabled bool `env:"OPENMOCK_AMQP_ENABLED" envDefault:"false"` AMQPURL string `env:"OPENMOCK_AMQP_URL" envDefault:"amqp://guest:guest@rabbitmq:5672"` RedisType string `env:"OPENMOCK_REDIS_TYPE" envDefault:"memory"`