Skip to content

Commit

Permalink
feat(admin): create topics
Browse files Browse the repository at this point in the history
  • Loading branch information
JoranVanBelle committed Jan 7, 2025
1 parent 17451f5 commit 932a777
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ To be able to run tests locally, `$KAFKA_TEST_BROKER` environment variable is ex

`$KAFKA_TEST_BROKER` should contain an IP address of an accessible Kafka broker that will be used to run integration tests against.

With [Docker Compose](./docker-compose.yml) this variable is used to configure Kafka broker to listen on this address:
With [Docker Compose](./docker-compose.yml) this variable is used to configure a Kafka broker with a UI on localhost:8080 to listen on this address:

```
$ docker-compose up
Expand Down
137 changes: 117 additions & 20 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,121 @@
version: "3.8"

version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- 2182:2181
environment:
SERVICE_NAME: zookeeper
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:latest
hostname: localhost
# Redpanda cluster
redpanda-1:
image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
container_name: redpanda-1
command:
- redpanda
- start
- --smp
- '1'
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- '1'
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda-1:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr
- PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082
- --advertise-pandaproxy-addr
- PLAINTEXT://redpanda-1:28082,OUTSIDE://localhost:8082
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
ports:
# - 8081:8081
- 8082:8082
- 9092:9092
links:
- zookeeper:zookeeper
- 9644:9644
- 28082:28082
- 29092:29092

# Want a two node Redpanda cluster? Uncomment this block :)
# redpanda-2:
# image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
# container_name: redpanda-2
# command:
# - redpanda
# - start
# - --smp
# - '1'
# - --reserve-memory
# - 0M
# - --overprovisioned
# - --node-id
# - '2'
# - --seeds
# - redpanda-1:33145
# - --kafka-addr
# - PLAINTEXT://0.0.0.0:29093,OUTSIDE://0.0.0.0:9093
# - --advertise-kafka-addr
# - PLAINTEXT://redpanda-2:29093,OUTSIDE://localhost:9093
# - --pandaproxy-addr
# - PLAINTEXT://0.0.0.0:28083,OUTSIDE://0.0.0.0:8083
# - --advertise-pandaproxy-addr
# - PLAINTEXT://redpanda-2:28083,OUTSIDE://localhost:8083
# - --rpc-addr
# - 0.0.0.0:33146
# - --advertise-rpc-addr
# - redpanda-2:33146
# ports:
# - 8083:8083
# - 9093:9093

# redpanda-3:
# image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
# container_name: redpanda-3
# command:
# - redpanda
# - start
# - --smp
# - '2'
# - --reserve-memory
# - 0M
# - --overprovisioned
# - --node-id
# - '3'
# - --seeds
# - redpanda-1:33145
# - --kafka-addr
# - PLAINTEXT://0.0.0.0:29094,OUTSIDE://0.0.0.0:9094
# - --advertise-kafka-addr
# - PLAINTEXT://redpanda-3:29094,OUTSIDE://localhost:9094
# - --pandaproxy-addr
# - PLAINTEXT://0.0.0.0:28084,OUTSIDE://0.0.0.0:8084
# - --advertise-pandaproxy-addr
# - PLAINTEXT://redpanda-3:28084,OUTSIDE://localhost:8084
# - --rpc-addr
# - 0.0.0.0:33147
# - --advertise-rpc-addr
# - redpanda-3:33147
# ports:
# - 8084:8084
# - 9094:9094

redpanda-console:
image: docker.redpanda.com/redpandadata/console:v2.2.2
container_name: redpanda-console
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://$KAFKA_TEST_BROKER:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda-1:29092"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda-1:9644"]
connect:
enabled: false
ports:
- 8080:8080
depends_on:
- redpanda-1
5 changes: 4 additions & 1 deletion hw-kafka-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ library
build-tool-depends: c2hs:c2hs
if impl(ghc <8.0)
build-depends: semigroups
exposed-modules: Kafka.Consumer
exposed-modules: Kafka.Admin
Kafka.Admin.AdminProperties
Kafka.Admin.Types
Kafka.Consumer
Kafka.Consumer.ConsumerProperties
Kafka.Consumer.Subscription
Kafka.Consumer.Types
Expand Down
83 changes: 83 additions & 0 deletions src/Kafka/Admin.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
module Kafka.Admin(
module X
, newKAdmin
, createTopic
, closeKAdmin
) where

import Control.Monad
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Control.Monad.IO.Class
import Data.Text
import Data.Maybe
import Data.Bifunctor
import Data.List.NonEmpty
import qualified Data.List.NonEmpty as NEL
import qualified Data.Text as T
import qualified Data.Set as S

import Kafka.Internal.RdKafka
import Kafka.Internal.Setup

import Kafka.Types as X
import Kafka.Admin.AdminProperties as X
import Kafka.Admin.Types as X

newKAdmin ::( MonadIO m )
=> AdminProperties
-> m (Either KafkaError KAdmin)
newKAdmin properties = liftIO $ do
kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties)
maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf'
case maybeKafka of
Left err -> pure $ Left $ KafkaError err
Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig

closeKAdmin :: KAdmin
-> IO ()
closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka)
--- CREATE TOPIC ---
createTopic :: KAdmin
-> NewTopic
-> IO (Either KafkaError TopicName)
createTopic kAdmin topic = liftIO $ do
let kafkaPtr = getRdKafka kAdmin
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue
case topicRes of
Left err -> do
pure $ Left (NEL.head err)
Right _ -> do
pure $ Right $ topicName topic

withNewTopic :: NewTopic
-> (RdKafkaNewTopicTPtr -> IO a)
-> IO (Either (NonEmpty KafkaError) a)
withNewTopic t transform = do
mkNewTopicRes <- mkNewTopic t topicPtr
case mkNewTopicRes of
Left err -> do
return $ Left err
Right topic -> do
res <- transform topic
return $ Right res

topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
topicPtr topic = do
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
case ptrRes of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

mkNewTopic :: NewTopic
-> (NewTopic -> IO (Either KafkaError a))
-> IO (Either (NonEmpty KafkaError) a)
mkNewTopic topic create = do
res <- create topic
case res of
Left err -> pure $ Left (Data.List.NonEmpty.singleton err)
Right resource -> pure $ Right resource

43 changes: 43 additions & 0 deletions src/Kafka/Admin/AdminProperties.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{-# LANGUAGE OverloadedStrings #-}

module Kafka.Admin.AdminProperties where

import Data.Map
import qualified Data.Map as M
import Data.Text

import Kafka.Types

newtype AdminProperties = AdminProperties {
adminProps :: Map Text Text
}

instance Semigroup AdminProperties where
( AdminProperties props1 ) <> ( AdminProperties props2 ) =
AdminProperties ( props2 `union` props1 )
{-# INLINE (<>) #-}

instance Monoid AdminProperties where
mempty = AdminProperties {
adminProps = M.empty
}
{-# INLINE mempty #-}
mappend = (<>)
{-# INLINE mappend #-}

brokers :: [BrokerAddress] -> AdminProperties
brokers b =
let b' = intercalate "," ((\( BrokerAddress i ) -> i ) <$> b )
in extraProps $ fromList [("bootstrap.servers", b')]

clientId :: ClientId -> AdminProperties
clientId (ClientId cid) =
extraProps $ M.fromList [("client.id", cid)]

timeOut :: Timeout -> AdminProperties
timeOut (Timeout to) =
let to' = ( pack $ show to )
in extraProps $ fromList [("request.timeout.ms", to')]

extraProps :: Map Text Text -> AdminProperties
extraProps m = mempty { adminProps = m }
34 changes: 34 additions & 0 deletions src/Kafka/Admin/Types.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Kafka.Admin.Types (
KAdmin(..)
, PartitionCount (..)
, ReplicationFactor (..)
, NewTopic (..)
) where

import Data.Map

import Kafka.Types
import Kafka.Internal.Setup

data KAdmin = KAdmin {
kaKafkaPtr :: !Kafka
, kaKafkaConf :: !KafkaConf
}

instance HasKafka KAdmin where
getKafka = kaKafkaPtr
{-# INLINE getKafka #-}

instance HasKafkaConf KAdmin where
getKafkaConf = kaKafkaConf
{-# INLINE getKafkaConf #-}

newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq)
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq)

data NewTopic = NewTopic {
topicName :: TopicName
, topicPartitionCount :: PartitionCount
, topicReplicationFactor :: ReplicationFactor
, topicConfig :: Map String String
} deriving (Show)
Loading

0 comments on commit 932a777

Please sign in to comment.