-
Notifications
You must be signed in to change notification settings - Fork 451
run producer in cunsumer #205
Comments
@SunnnyGohil for now you can only run the sync producer while consuming messages, since the control of the event loop is done inside of the classes |
thanks for reply @lcobucci when i run producer into consumer that time i got "Exception 'LogicException' with message 'Cannot run() recursively; event reactor already active" this error. if you have solution than please help me |
@SunnnyGohil the producer should be run in the synchronous mode: https://github.com/weiboad/kafka-php#synchronous-mode |
@lcobucci i aleardy run producer in sync mode but i got error |
Can you put your code here? |
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.10.1.0');
$config->setTopics(['test']);
$config->setOffsetReset('earliest');
$config->setClientId(1);
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic,$part,$message) use (&$partition){
$part=(int)$partition;
$st=@(time()+microtime());
$data=(array)json_decode($message['message']['value']);
$data['data']=(array)$data['data'];
$GLOBALS['log']=$data;
$config_producer = \Kafka\ProducerConfig::getInstance();
$config_producer->setMetadataRefreshIntervalMs(10000);
$config_producer->setMetadataBrokerList('127.0.0.1:9092');
$config_producer->setBrokerVersion('0.10.1.0');
$config_producer->setRequiredAck(1);
$config_producer->setIsAsyn(FALSE);
$config_producer->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'make',
'partId'=>0,
'value' => json_encode($GLOBALS['log']),
'key' => 'click',
],
];
}
);
$producer->send(true);
}); this is my code |
@SunnnyGohil $producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'make',
'partId'=>0,
'value' => json_encode($GLOBALS['log']),
'key' => 'click',
],
];
}
);
$producer->send(true); |
thanks @lcobucci |
@SunnnyGohil are you using the stable release? If so, could you try with |
yes @lcobucci i am using stable release. |
i solv this by calling self API |
@SunnnyGohil I run into the same problem. Can you, please, explain what does it mean:
My error message is
From what I was able to figure out the problem is that if you configure $dataLen = Protocol::unpack(Protocol::BIT_B32, $socket->read(4)); And as @lcobucci, can you, please, advise how I can overcome this problem. Thank you in advance. |
dose partId is work? |
@SunnnyGohil can you please tell me how did you solve this issue? |
I has this problem too. And have no idea how i can solve it |
@Sevavietl yes i saw that, bad that PR did not get merged. @djklim87 I was able to use producer from consumer using low level api provided. look at https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php so, you need to use low level api which is used internally anyways to communicate. Let me know if you need more help. |
run producer into consumer,can any body solve this issue. |
@lijunchen22 refer to https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php you would need to use low level api as mentioned in above file. |
@dawood I try. but it doesn't work. Can you show me your code here? |
|
@dawood it work,thanks a million |
Hii
Can i run producer into Consumer in kafka-php?
The text was updated successfully, but these errors were encountered: