Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

run producer in cunsumer #205

Open
SunnnyGohil opened this issue Apr 23, 2018 · 22 comments
Open

run producer in cunsumer #205

SunnnyGohil opened this issue Apr 23, 2018 · 22 comments

Comments

@SunnnyGohil
Copy link

SunnnyGohil commented Apr 23, 2018

Hii

Can i run producer into Consumer in kafka-php?

@lcobucci
Copy link
Contributor

@SunnnyGohil for now you can only run the sync producer while consuming messages, since the control of the event loop is done inside of the classes

@SunnnyGohil
Copy link
Author

thanks for reply @lcobucci

when i run producer into consumer that time i got "Exception 'LogicException' with message 'Cannot run() recursively; event reactor already active" this error.

if you have solution than please help me

@lcobucci
Copy link
Contributor

@SunnnyGohil the producer should be run in the synchronous mode: https://github.com/weiboad/kafka-php#synchronous-mode

@SunnnyGohil
Copy link
Author

@lcobucci i aleardy run producer in sync mode but i got error

@lcobucci
Copy link
Contributor

Can you put your code here?

@SunnnyGohil
Copy link
Author

SunnnyGohil commented Apr 23, 2018

require '../vendor/autoload.php';
    date_default_timezone_set('PRC');
    
     $config = \Kafka\ConsumerConfig::getInstance();
    $config->setMetadataRefreshIntervalMs(10000);
    $config->setMetadataBrokerList('127.0.0.1:9092');
    $config->setGroupId('test');
    $config->setBrokerVersion('0.10.1.0');
    $config->setTopics(['test']);
    $config->setOffsetReset('earliest');
    $config->setClientId(1);
    $consumer = new \Kafka\Consumer();
    $consumer->start(function($topic,$part,$message) use (&$partition){
        
        $part=(int)$partition;
        $st=@(time()+microtime());
        $data=(array)json_decode($message['message']['value']);
        $data['data']=(array)$data['data'];
                $GLOBALS['log']=$data;
                $config_producer = \Kafka\ProducerConfig::getInstance();
                $config_producer->setMetadataRefreshIntervalMs(10000);
                $config_producer->setMetadataBrokerList('127.0.0.1:9092');
                $config_producer->setBrokerVersion('0.10.1.0');
                $config_producer->setRequiredAck(1);
                $config_producer->setIsAsyn(FALSE);
                $config_producer->setProduceInterval(500);
                $producer = new \Kafka\Producer(
                    function() {
                        return [
                            [
                                'topic' => 'make',
                                'partId'=>0,
                                'value' => json_encode($GLOBALS['log']),
                                'key' => 'click',
                            ],
                        ];
                    }
                );
                $producer->send(true);
    });

this is my code

@lcobucci
Copy link
Contributor

@SunnnyGohil setIsAsyn() does not put the consumer in synchronous mode, if you compare your code with the one in the link I've sent you you'll notice that there are differences in:

$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'make',
                'partId'=>0,
                'value' => json_encode($GLOBALS['log']),
                'key' => 'click',
            ],
        ];
    }
);

$producer->send(true);

@SunnnyGohil
Copy link
Author

thanks @lcobucci
i changes in my code but i got there new exception
"Exception 'Kafka\Exception\Protocol' with message 'unpack failed. string(raw) length is 0 , TO N"

@lcobucci
Copy link
Contributor

@SunnnyGohil are you using the stable release? If so, could you try with dev-master?

@SunnnyGohil
Copy link
Author

SunnnyGohil commented Apr 24, 2018

yes @lcobucci i am using stable release.
my project is on it, so i can't change it.
can you give me another solution?

@SunnnyGohil
Copy link
Author

i solv this by calling self API

@Sevavietl
Copy link

@SunnnyGohil I run into the same problem. Can you, please, explain what does it mean:

i solv this by calling self API

My error message is

Argument 2 passed to Kafka\\Protocol\\Protocol::unpack() must be of the type string, null given, called in /var/www/ads-export/vendor/nmred/kafka-php/src/Producer/SyncProcess.php on line 135

From what I was able to figure out the problem is that if you configure ConsumerConfig and ProducerConfig using the same metadata broker, one ends up overwritten. In my particular case I am faced with situation when I have Kafka\Socket inside Kafka\Producer\SyncProcess. So when calling syncMeta() method, $socket->read(4) returns null instead of string (Kafka\SocketSync returns string in this case):

$dataLen       = Protocol::unpack(Protocol::BIT_B32, $socket->read(4));

And as unpack() method expects string, but no null the script fail.

@lcobucci, can you, please, advise how I can overcome this problem.

Thank you in advance.

@kevinmiao
Copy link

dose partId is work?

@dawood
Copy link

dawood commented Oct 20, 2018

@SunnnyGohil can you please tell me how did you solve this issue?

@djklim87
Copy link

I has this problem too. And have no idea how i can solve it

@Sevavietl
Copy link

@dawood @djklim87 it was solved in aforementioned PR, but it never merged.

@dawood
Copy link

dawood commented Nov 14, 2018

@Sevavietl yes i saw that, bad that PR did not get merged.

@djklim87 I was able to use producer from consumer using low level api provided. look at https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php

so, you need to use low level api which is used internally anyways to communicate. Let me know if you need more help.

@lijunchen22
Copy link

run producer into consumer,can any body solve this issue.

@dawood
Copy link

dawood commented Mar 11, 2020

@lijunchen22 refer to https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php

you would need to use low level api as mentioned in above file.

@lijunchen22
Copy link

@dawood I try. but it doesn't work. Can you show me your code here?

@dawood
Copy link

dawood commented Mar 11, 2020

@lijunchen22

<?
$data = [
    'required_ack' => 1,	
    'timeout' => '1000',
    'data' => [
    	[
    		'topic_name' => 'SOME_TOPIC',
    		'partitions' => [
        		[
    				'partition_id' => 0,
    				'messages'     => $eventsToSend,
        		],
    		],
    	],
    ],
];
Protocol::init('1.0.0');
$requestData = Protocol::encode(Protocol::PRODUCE_REQUEST, $data);
$socket = new Socket($kafkaBrokerHost, $kafkaBrokerPort);
$socket->setOnReadable(function ($data): void {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = Protocol::decode(Protocol::PRODUCE_REQUEST, substr($data, 4));
echo json_encode($result);
});
$socket->connect();
$socket->write($requestData);

@lijunchen22
Copy link

@dawood it work,thanks a million

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants