Skip to content

Commit

Permalink
Improve rebalance example by simulating
Browse files Browse the repository at this point in the history
a user transaction an random failures
to show how it must behave
  • Loading branch information
emasab committed Aug 10, 2023
1 parent 22808ab commit 8b31487
Showing 1 changed file with 186 additions and 23 deletions.
209 changes: 186 additions & 23 deletions examples/consumer_rebalance_example/consumer_rebalance_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@

// Example high-level Apache Kafka consumer demonstrating use of rebalance
// callback along with manual commit.
// It processes a batch of messages in parallel and pairs Kafka commits with
// a simulated user transaction. Random failures are present
// to show how it must behave in each case.
package main

import (
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

var processing []chan error

func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <group> <topics..>\n",
Expand All @@ -37,9 +45,10 @@ func main() {
bootstrapServers := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
exitCode := 0

sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
Expand All @@ -50,25 +59,38 @@ func main() {
// for this group.
"auto.offset.reset": "earliest",
// Whether or not we commit offsets automatically.
"enable.auto.commit": false,
"enable.auto.commit": false,
"enable.partition.eof": true,
})
defer func() {
fmt.Printf("%% Closing consumer\n")
c.Close()
os.Exit(exitCode)
}()

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
exitCode = 1
return
}

fmt.Printf("%% Created Consumer %v\n", c)

// Subscribe to topics, call the rebalanceCallback on assignment/revoke.
// The rebalanceCallback can be triggered from c.Poll() and c.Close().
err = c.SubscribeTopics(topics, rebalanceCallback)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed subscribing to topics: %s\n", err)
exitCode = 1
return
}

run := true
for run == true {
for run {
select {
case sig := <-sigchan:
fmt.Printf("%% Caught signal %v: terminating\n", sig)
commit(c)
run = false
default:
ev := c.Poll(100)
Expand All @@ -79,12 +101,8 @@ func main() {
if err = processEvent(c, ev); err != nil {
fmt.Fprintf(os.Stderr, "Failed to process event: %s\n", err)
}

}
}

fmt.Printf("%% Closing consumer\n")
c.Close()
}

// processEvent processes the message/error received from the kafka Consumer's
Expand All @@ -94,12 +112,20 @@ func processEvent(c *kafka.Consumer, ev kafka.Event) error {

case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
processMessage(e)

// Handle manual commit since enable.auto.commit is unset.
if err := maybeCommit(c, e.TopicPartition); err != nil {
return err
}

case kafka.PartitionEOF:
fmt.Printf("%% EOF for topic %s partition %d, committing\n", *e.Topic,
e.Partition)
if err := commit(c); err != nil {
return err
}

case kafka.Error:
// Errors should generally be considered informational, the client
// will try to automatically recover.
Expand All @@ -113,6 +139,42 @@ func processEvent(c *kafka.Consumer, ev kafka.Event) error {
return nil
}

// processMessage starts parallel processing of a message and
// appends the returned channel to the processing slice.
func processMessage(message *kafka.Message) error {
processing = append(processing, parallelProcessMessage(message))
return nil
}

// parallelProcessMessage starts parallel processing of a message and
// returns a channel were the corresponding error will be produced.
func parallelProcessMessage(message *kafka.Message) chan error {
channel := make(chan error)
go func() {
time.Sleep(100 * time.Millisecond)
channel <- randomFailure()
close(channel)
}()
return channel
}

// completeProcessing awaits all processing tasks and
// returns an error if at least one of them failed.
func completeProcessing() error {
fmt.Printf("%% Complete pending tasks\n")
var err error = nil
// Awaits a result from all the processing tasks
for _, channel := range processing {
currentErr := <-channel
if err == nil {
err = currentErr
}
}
// Clear the processing slice
processing = processing[:0]
return err
}

// maybeCommit is called for each message we receive from a Kafka topic.
// This method can be used to apply some arbitary logic/processing to the
// offsets, write the offsets into some external storage, and finally, to
Expand All @@ -126,7 +188,32 @@ func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
return nil
}

commitedOffsets, err := c.Commit()
fmt.Printf("%% maybeCommit: do commit\n")
return commit(c)
}

// commit completes current parallel processing,
// commits user transaction and offsets on Kafka.
// If something fails, calls abort to abort the transaction.
func commit(c *kafka.Consumer) error {
fmt.Printf("%% Committing transaction\n")

var err error
err = completeProcessing()
if err != nil {
fmt.Fprintf(os.Stderr, "Processing tasks failed, aborting\n")
abort(c)
return err
}

err = userTransactionCommit()
if err != nil {
fmt.Fprintf(os.Stderr, "User commit failed, aborting\n")
abort(c)
return err
}

committedOffsets, err := c.Commit()

// ErrNoOffset occurs when there are no stored offsets to commit. This
// can happen if we haven't stored anything since the last commit.
Expand All @@ -135,10 +222,89 @@ func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
// handling is illustrative of how to handle it in cases we call Commit()
// in another way, for example, every N seconds.
if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
fmt.Fprintf(os.Stderr, "Fatal: Committed offsets to user transaction but not to Kafka: %s\n", err.Error())
return err
}

fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets)
fmt.Printf("%% Committed offsets to Kafka: %v\n", committedOffsets)
return nil
}

// userTransactionCommit models committing a user transaction, that can be a
// DBMS transaction of something else.
// Returns a random failure
func userTransactionCommit() error {
fmt.Printf("%% Committing user transaction\n")
return randomFailure()
}

// userTransactionAbort models aborting a user transaction, that can be a
// DBMS transaction of something else.
// Returns a random failure
func userTransactionAbort() error {
fmt.Printf("%% Aborting user transaction\n")
return randomFailure()
}

// Returns an error with probability 0.05 or no error with probability 0.95
func randomFailure() error {
if rand.Float64() < 0.05 {
return fmt.Errorf("random failure")
}
return nil
}

// abort completes current parallel processing,
// aborts user transaction and rewinds consumer to committed offsets.
func abort(c *kafka.Consumer) {
fmt.Printf("%% Aborting transaction\n")
completeProcessing()
// Ignore error, transaction is aborting anyway

var err error
// Continue retrying, if it cannot abort a transaction or seek assigned
// partitions, probably it cannot communicate with one of the two components,
// so it's not worth trying anything else.
for {
err = userTransactionAbort()
if err != nil {
fmt.Fprintf(os.Stderr, "userTransactionAbort failed, retry: %s\n", err.Error())
continue
}
err = rewindConsumerPosition(c)
if err == nil {
return
}
fmt.Fprintf(os.Stderr, "rewindConsumerPosition failed, retry: %s\n", err.Error())
// Pause between retries
time.Sleep(3 * time.Second)
}
}

// rewindConsumerPosition Rewinds consumer's position to the
// previous committed offset
func rewindConsumerPosition(c *kafka.Consumer) error {
fmt.Printf("%% Rewind to committed offsets\n")
assignment, err := c.Assignment()
if err != nil {
return err
}

committed, err := c.Committed(assignment, 30*1000 /* 30s */)
if err != nil {
return err
}

for _, tp := range committed {
if tp.Offset < 0 {
tp.Offset = kafka.OffsetBeginning
tp.LeaderEpoch = nil
}
err := c.Seek(tp, 1)
if err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -175,6 +341,11 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
fmt.Printf("%% %s rebalance: %d partition(s) revoked: %v\n",
c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions)

if c.IsClosed() {
// This is last revoke before closing, cannot commit here
// must be done before closing.
return nil
}
// Usually, the rebalance callback for `RevokedPartitions` is called
// just before the partitions are revoked. We can be certain that a
// partition being revoked is not yet owned by any other consumer.
Expand All @@ -183,28 +354,20 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
// However, there can be cases where the assignment is lost
// involuntarily. In this case, the partition might already be owned
// by another consumer, and operations including committing
// offsets may not work.
// offsets may not work. We abort user transaction too in this case.
if c.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
fmt.Fprintln(os.Stderr, "Assignment lost involuntarily, commit may fail")
}

// Since enable.auto.commit is unset, we need to commit offsets manually
// before the partition is revoked.
commitedOffsets, err := c.Commit()

if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
fmt.Fprintf(os.Stderr, "Failed to commit offsets: %s\n", err)
return err
abort(c)
} else {
return commit(c)
}
fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets)

// Similar to Assign, client automatically calls Unassign() unless the
// callback has already called that method. Here, we don't call it.

default:
fmt.Fprintf(os.Stderr, "Unxpected event type: %v\n", event)
fmt.Fprintf(os.Stderr, "Unexpected event type: %v\n", event)
}

return nil
Expand Down

0 comments on commit 8b31487

Please sign in to comment.