Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafkaAdmin #202

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ To be able to run tests locally, `$KAFKA_TEST_BROKER` environment variable is ex

`$KAFKA_TEST_BROKER` should contain an IP address of an accessible Kafka broker that will be used to run integration tests against.

With [Docker Compose](./docker-compose.yml) this variable is used to configure Kafka broker to listen on this address:
With [Docker Compose](./docker-compose.yml) this variable is used to configure a Kafka broker with a UI on localhost:8080 to listen on this address:

```
$ docker-compose up
Expand Down
137 changes: 117 additions & 20 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,121 @@
version: "3.8"

version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- 2182:2181
environment:
SERVICE_NAME: zookeeper
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:latest
hostname: localhost
# Redpanda cluster
redpanda-1:
image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
container_name: redpanda-1
command:
- redpanda
- start
- --smp
- '1'
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- '1'
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda-1:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr
- PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082
- --advertise-pandaproxy-addr
- PLAINTEXT://redpanda-1:28082,OUTSIDE://localhost:8082
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
ports:
# - 8081:8081
- 8082:8082
- 9092:9092
links:
- zookeeper:zookeeper
- 9644:9644
- 28082:28082
- 29092:29092

# Want a two node Redpanda cluster? Uncomment this block :)
# redpanda-2:
# image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
# container_name: redpanda-2
# command:
# - redpanda
# - start
# - --smp
# - '1'
# - --reserve-memory
# - 0M
# - --overprovisioned
# - --node-id
# - '2'
# - --seeds
# - redpanda-1:33145
# - --kafka-addr
# - PLAINTEXT://0.0.0.0:29093,OUTSIDE://0.0.0.0:9093
# - --advertise-kafka-addr
# - PLAINTEXT://redpanda-2:29093,OUTSIDE://localhost:9093
# - --pandaproxy-addr
# - PLAINTEXT://0.0.0.0:28083,OUTSIDE://0.0.0.0:8083
# - --advertise-pandaproxy-addr
# - PLAINTEXT://redpanda-2:28083,OUTSIDE://localhost:8083
# - --rpc-addr
# - 0.0.0.0:33146
# - --advertise-rpc-addr
# - redpanda-2:33146
# ports:
# - 8083:8083
# - 9093:9093

# redpanda-3:
# image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
# container_name: redpanda-3
# command:
# - redpanda
# - start
# - --smp
# - '2'
# - --reserve-memory
# - 0M
# - --overprovisioned
# - --node-id
# - '3'
# - --seeds
# - redpanda-1:33145
# - --kafka-addr
# - PLAINTEXT://0.0.0.0:29094,OUTSIDE://0.0.0.0:9094
# - --advertise-kafka-addr
# - PLAINTEXT://redpanda-3:29094,OUTSIDE://localhost:9094
# - --pandaproxy-addr
# - PLAINTEXT://0.0.0.0:28084,OUTSIDE://0.0.0.0:8084
# - --advertise-pandaproxy-addr
# - PLAINTEXT://redpanda-3:28084,OUTSIDE://localhost:8084
# - --rpc-addr
# - 0.0.0.0:33147
# - --advertise-rpc-addr
# - redpanda-3:33147
# ports:
# - 8084:8084
# - 9094:9094

redpanda-console:
image: docker.redpanda.com/redpandadata/console:v2.2.2
container_name: redpanda-console
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://$KAFKA_TEST_BROKER:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda-1:29092"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda-1:9644"]
connect:
enabled: false
ports:
- 8080:8080
depends_on:
- redpanda-1
5 changes: 4 additions & 1 deletion hw-kafka-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ library
build-tool-depends: c2hs:c2hs
if impl(ghc <8.0)
build-depends: semigroups
exposed-modules: Kafka.Consumer
exposed-modules: Kafka.Admin
Kafka.Admin.AdminProperties
Kafka.Admin.Types
Kafka.Consumer
Kafka.Consumer.ConsumerProperties
Kafka.Consumer.Subscription
Kafka.Consumer.Types
Expand Down
122 changes: 122 additions & 0 deletions src/Kafka/Admin.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
module Kafka.Admin(
module X
, newKAdmin
, createTopic
, deleteTopic
, closeKAdmin
) where

import Control.Monad
import Control.Monad.IO.Class
import Data.Text
import Data.List.NonEmpty
import qualified Data.List.NonEmpty as NEL
import qualified Data.Text as T

import Kafka.Internal.RdKafka
import Kafka.Internal.Setup

import Kafka.Types as X
import Kafka.Admin.AdminProperties as X
import Kafka.Admin.Types as X

newKAdmin ::( MonadIO m )
=> AdminProperties
-> m (Either KafkaError KAdmin)
newKAdmin properties = liftIO $ do
kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties)
maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf'
case maybeKafka of
Left err -> pure $ Left $ KafkaError err
Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig

closeKAdmin :: KAdmin
-> IO ()
closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka)
--- CREATE TOPIC ---
createTopic :: KAdmin
-> NewTopic
-> IO (Either KafkaError TopicName)
createTopic kAdmin topic = liftIO $ do
let kafkaPtr = getRdKafka kAdmin
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue
case topicRes of
Left err -> do
pure $ Left (NEL.head err)
Right _ -> do
pure $ Right $ topicName topic

--- DELETE TOPIC ---
deleteTopic :: KAdmin
-> TopicName
-> IO (Either KafkaError TopicName)
deleteTopic kAdmin topic = liftIO $ do
let kafkaPtr = getRdKafka kAdmin
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

topicRes <- withOldTopic topic $ \topic' -> rdKafkaDeleteTopics kafkaPtr [topic'] opts queue
case topicRes of
Left err -> do
pure $ Left (NEL.head err)
Right _ -> do
pure $ Right topic

withNewTopic :: NewTopic
-> (RdKafkaNewTopicTPtr -> IO a)
-> IO (Either (NonEmpty KafkaError) a)
withNewTopic t transform = do
mkNewTopicRes <- mkNewTopic t newTopicPtr
case mkNewTopicRes of
Left err -> do
return $ Left err
Right topic -> do
res <- transform topic
return $ Right res

withOldTopic :: TopicName
-> (RdKafkaDeleteTopicTPtr -> IO a)
-> IO (Either (NonEmpty KafkaError) a)
withOldTopic tName transform = do
rmOldTopicRes <- rmOldTopic tName oldTopicPtr
case rmOldTopicRes of
Left err -> do
return $ Left err
Right topic -> do
res <- transform topic
return $ Right res

newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
newTopicPtr topic = do
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
case ptrRes of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr)
oldTopicPtr tName = do
res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName
case res of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

mkNewTopic :: NewTopic
-> (NewTopic -> IO (Either KafkaError a))
-> IO (Either (NonEmpty KafkaError) a)
mkNewTopic topic create = do
res <- create topic
case res of
Left err -> pure $ Left (NEL.singleton err)
Right resource -> pure $ Right resource

rmOldTopic :: TopicName
-> (TopicName -> IO (Either KafkaError a))
-> IO (Either (NonEmpty KafkaError) a)
rmOldTopic tName remove = do
res <- remove tName
case res of
Left err -> pure $ Left (NEL.singleton err)
Right resource -> pure $ Right resource
43 changes: 43 additions & 0 deletions src/Kafka/Admin/AdminProperties.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{-# LANGUAGE OverloadedStrings #-}

module Kafka.Admin.AdminProperties where

import Data.Map
import qualified Data.Map as M
import Data.Text

import Kafka.Types

newtype AdminProperties = AdminProperties {
adminProps :: Map Text Text
}

instance Semigroup AdminProperties where
( AdminProperties props1 ) <> ( AdminProperties props2 ) =
AdminProperties ( props2 `union` props1 )
{-# INLINE (<>) #-}

instance Monoid AdminProperties where
mempty = AdminProperties {
adminProps = M.empty
}
{-# INLINE mempty #-}
mappend = (<>)
{-# INLINE mappend #-}

brokers :: [BrokerAddress] -> AdminProperties
brokers b =
let b' = intercalate "," ((\( BrokerAddress i ) -> i ) <$> b )
in extraProps $ fromList [("bootstrap.servers", b')]

clientId :: ClientId -> AdminProperties
clientId (ClientId cid) =
extraProps $ M.fromList [("client.id", cid)]

timeOut :: Timeout -> AdminProperties
timeOut (Timeout to) =
let to' = ( pack $ show to )
in extraProps $ fromList [("request.timeout.ms", to')]

extraProps :: Map Text Text -> AdminProperties
extraProps m = mempty { adminProps = m }
34 changes: 34 additions & 0 deletions src/Kafka/Admin/Types.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Kafka.Admin.Types (
KAdmin(..)
, PartitionCount (..)
, ReplicationFactor (..)
, NewTopic (..)
) where

import Data.Map

import Kafka.Types
import Kafka.Internal.Setup

data KAdmin = KAdmin {
kaKafkaPtr :: !Kafka
, kaKafkaConf :: !KafkaConf
}

instance HasKafka KAdmin where
getKafka = kaKafkaPtr
{-# INLINE getKafka #-}

instance HasKafkaConf KAdmin where
getKafkaConf = kaKafkaConf
{-# INLINE getKafkaConf #-}

newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq)
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq)

data NewTopic = NewTopic {
topicName :: TopicName
, topicPartitionCount :: PartitionCount
, topicReplicationFactor :: ReplicationFactor
, topicConfig :: Map String String
} deriving (Show)
Loading