diff --git a/examples/consumer_rebalance_example/consumer_rebalance_example.go b/examples/consumer_rebalance_example/consumer_rebalance_example.go index 94aa5a9dd..2cc164bf3 100644 --- a/examples/consumer_rebalance_example/consumer_rebalance_example.go +++ b/examples/consumer_rebalance_example/consumer_rebalance_example.go @@ -16,16 +16,24 @@ // Example high-level Apache Kafka consumer demonstrating use of rebalance // callback along with manual commit. +// It processes a batch of messages in parallel and pairs Kafka commits with +// a simulated user transaction. Random failures are present +// to show how it must behave in each case. package main import ( "fmt" + "math/rand" "os" "os/signal" + "syscall" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) +var processing []chan error + func main() { if len(os.Args) < 4 { fmt.Fprintf(os.Stderr, "Usage: %s \n", @@ -37,9 +45,10 @@ func main() { bootstrapServers := os.Args[1] group := os.Args[2] topics := os.Args[3:] + exitCode := 0 sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, os.Interrupt) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": bootstrapServers, @@ -50,12 +59,19 @@ func main() { // for this group. "auto.offset.reset": "earliest", // Whether or not we commit offsets automatically. - "enable.auto.commit": false, + "enable.auto.commit": false, + "enable.partition.eof": true, }) + defer func() { + fmt.Printf("%% Closing consumer\n") + c.Close() + os.Exit(exitCode) + }() if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) - os.Exit(1) + exitCode = 1 + return } fmt.Printf("%% Created Consumer %v\n", c) @@ -63,12 +79,18 @@ func main() { // Subscribe to topics, call the rebalanceCallback on assignment/revoke. // The rebalanceCallback can be triggered from c.Poll() and c.Close(). err = c.SubscribeTopics(topics, rebalanceCallback) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed subscribing to topics: %s\n", err) + exitCode = 1 + return + } run := true - for run == true { + for run { select { case sig := <-sigchan: fmt.Printf("%% Caught signal %v: terminating\n", sig) + commit(c) run = false default: ev := c.Poll(100) @@ -79,12 +101,8 @@ func main() { if err = processEvent(c, ev); err != nil { fmt.Fprintf(os.Stderr, "Failed to process event: %s\n", err) } - } } - - fmt.Printf("%% Closing consumer\n") - c.Close() } // processEvent processes the message/error received from the kafka Consumer's @@ -94,12 +112,20 @@ func processEvent(c *kafka.Consumer, ev kafka.Event) error { case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) + processMessage(e) // Handle manual commit since enable.auto.commit is unset. if err := maybeCommit(c, e.TopicPartition); err != nil { return err } + case kafka.PartitionEOF: + fmt.Printf("%% EOF for topic %s partition %d, committing\n", *e.Topic, + e.Partition) + if err := commit(c); err != nil { + return err + } + case kafka.Error: // Errors should generally be considered informational, the client // will try to automatically recover. @@ -113,6 +139,42 @@ func processEvent(c *kafka.Consumer, ev kafka.Event) error { return nil } +// processMessage starts parallel processing of a message and +// appends the returned channel to the processing slice. +func processMessage(message *kafka.Message) error { + processing = append(processing, parallelProcessMessage(message)) + return nil +} + +// parallelProcessMessage starts parallel processing of a message and +// returns a channel were the corresponding error will be produced. +func parallelProcessMessage(message *kafka.Message) chan error { + channel := make(chan error) + go func() { + time.Sleep(100 * time.Millisecond) + channel <- randomFailure() + close(channel) + }() + return channel +} + +// completeProcessing awaits all processing tasks and +// returns an error if at least one of them failed. +func completeProcessing() error { + fmt.Printf("%% Complete pending tasks\n") + var err error = nil + // Awaits a result from all the processing tasks + for _, channel := range processing { + currentErr := <-channel + if err == nil { + err = currentErr + } + } + // Clear the processing slice + processing = processing[:0] + return err +} + // maybeCommit is called for each message we receive from a Kafka topic. // This method can be used to apply some arbitary logic/processing to the // offsets, write the offsets into some external storage, and finally, to @@ -126,7 +188,32 @@ func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error { return nil } - commitedOffsets, err := c.Commit() + fmt.Printf("%% maybeCommit: do commit\n") + return commit(c) +} + +// commit completes current parallel processing, +// commits user transaction and offsets on Kafka. +// If something fails, calls abort to abort the transaction. +func commit(c *kafka.Consumer) error { + fmt.Printf("%% Committing transaction\n") + + var err error + err = completeProcessing() + if err != nil { + fmt.Fprintf(os.Stderr, "Processing tasks failed, aborting\n") + abort(c) + return err + } + + err = userTransactionCommit() + if err != nil { + fmt.Fprintf(os.Stderr, "User commit failed, aborting\n") + abort(c) + return err + } + + committedOffsets, err := c.Commit() // ErrNoOffset occurs when there are no stored offsets to commit. This // can happen if we haven't stored anything since the last commit. @@ -135,10 +222,89 @@ func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error { // handling is illustrative of how to handle it in cases we call Commit() // in another way, for example, every N seconds. if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset { + fmt.Fprintf(os.Stderr, "Fatal: Committed offsets to user transaction but not to Kafka: %s\n", err.Error()) return err } - fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets) + fmt.Printf("%% Committed offsets to Kafka: %v\n", committedOffsets) + return nil +} + +// userTransactionCommit models committing a user transaction, that can be a +// DBMS transaction of something else. +// Returns a random failure +func userTransactionCommit() error { + fmt.Printf("%% Committing user transaction\n") + return randomFailure() +} + +// userTransactionAbort models aborting a user transaction, that can be a +// DBMS transaction of something else. +// Returns a random failure +func userTransactionAbort() error { + fmt.Printf("%% Aborting user transaction\n") + return randomFailure() +} + +// Returns an error with probability 0.05 or no error with probability 0.95 +func randomFailure() error { + if rand.Float64() < 0.05 { + return fmt.Errorf("random failure") + } + return nil +} + +// abort completes current parallel processing, +// aborts user transaction and rewinds consumer to committed offsets. +func abort(c *kafka.Consumer) { + fmt.Printf("%% Aborting transaction\n") + completeProcessing() + // Ignore error, transaction is aborting anyway + + var err error + // Continue retrying, if it cannot abort a transaction or seek assigned + // partitions, probably it cannot communicate with one of the two components, + // so it's not worth trying anything else. + for { + err = userTransactionAbort() + if err != nil { + fmt.Fprintf(os.Stderr, "userTransactionAbort failed, retry: %s\n", err.Error()) + continue + } + err = rewindConsumerPosition(c) + if err == nil { + return + } + fmt.Fprintf(os.Stderr, "rewindConsumerPosition failed, retry: %s\n", err.Error()) + // Pause between retries + time.Sleep(3 * time.Second) + } +} + +// rewindConsumerPosition Rewinds consumer's position to the +// previous committed offset +func rewindConsumerPosition(c *kafka.Consumer) error { + fmt.Printf("%% Rewind to committed offsets\n") + assignment, err := c.Assignment() + if err != nil { + return err + } + + committed, err := c.Committed(assignment, 30*1000 /* 30s */) + if err != nil { + return err + } + + for _, tp := range committed { + if tp.Offset < 0 { + tp.Offset = kafka.OffsetBeginning + tp.LeaderEpoch = nil + } + err := c.Seek(tp, 1) + if err != nil { + return err + } + } return nil } @@ -175,6 +341,11 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { fmt.Printf("%% %s rebalance: %d partition(s) revoked: %v\n", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) + if c.IsClosed() { + // This is last revoke before closing, cannot commit here + // must be done before closing. + return nil + } // Usually, the rebalance callback for `RevokedPartitions` is called // just before the partitions are revoked. We can be certain that a // partition being revoked is not yet owned by any other consumer. @@ -183,28 +354,20 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { // However, there can be cases where the assignment is lost // involuntarily. In this case, the partition might already be owned // by another consumer, and operations including committing - // offsets may not work. + // offsets may not work. We abort user transaction too in this case. if c.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. - fmt.Fprintln(os.Stderr, "Assignment lost involuntarily, commit may fail") - } - - // Since enable.auto.commit is unset, we need to commit offsets manually - // before the partition is revoked. - commitedOffsets, err := c.Commit() - - if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset { - fmt.Fprintf(os.Stderr, "Failed to commit offsets: %s\n", err) - return err + abort(c) + } else { + return commit(c) } - fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets) // Similar to Assign, client automatically calls Unassign() unless the // callback has already called that method. Here, we don't call it. default: - fmt.Fprintf(os.Stderr, "Unxpected event type: %v\n", event) + fmt.Fprintf(os.Stderr, "Unexpected event type: %v\n", event) } return nil