Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
backport #188 to v0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
nmred committed Jan 31, 2018
1 parent e14311b commit bbe9329
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -557,21 +557,23 @@ public function succFetch($result, $fd)
continue;
}

$offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);
if ($offset === false) {
$consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);
if ($consumerOffset === false) {
return; // current is rejoin....
}
foreach ($part['messages'] as $message) {
$this->messages[$topic['topicName']][$part['partition']][] = $message;
//if ($this->consumer != null) {
// call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message);
//}
$offset = $message['offset'];
$commitOffset = $message['offset'];
}

$consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset;
$commitOffset = isset($commitOffset) ? $commitOffset : $consumerOffset - 1;
$consumerOffset = $commitOffset + 1;

$assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset);
$assign->setCommitOffset($topic['topicName'], $part['partition'], $offset);
$assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset);
}
}
$this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH, $fd);
Expand Down

0 comments on commit bbe9329

Please sign in to comment.