diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index f85ac0a..b39789f 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -127,13 +127,13 @@ def manage_partition_consumers(handler) distributed_partitions = self.class.distribute_partitions(running_instances, partitions) my_partitions = distributed_partitions[@instance] - logger.info "Claiming #{my_partitions.length} out of #{partitions.length} partitions." + logger.info "Claiming #{my_partitions&.length} out of #{partitions&.length} partitions." # based onw hat partitions we should be consuming and the partitions # that we already are consuming, figure out what partition consumers # to stop and start - partitions_to_stop = @partition_consumers.keys - my_partitions - partitions_to_start = my_partitions - @partition_consumers.keys + partitions_to_stop = @partition_consumers.keys - (my_partitions || []) + partitions_to_start = (my_partitions || []) - @partition_consumers.keys # Stop the partition consumers we should no longer be running in parallel if partitions_to_stop.length > 0