diff --git a/Tests/Bayeux/BayeuxClientTest.php b/Tests/Bayeux/BayeuxClientTest.php index 13db2bc..40dcff5 100644 --- a/Tests/Bayeux/BayeuxClientTest.php +++ b/Tests/Bayeux/BayeuxClientTest.php @@ -46,6 +46,11 @@ protected function setUp() public function testStream() { + if (getenv('AccountPushTopic') == '') { + $this->fail('Account push topic not named in .env.'); + return; + } + $rand = rand(100, 1000); $name = 'Test Account '.$rand; @@ -73,11 +78,11 @@ function (ChannelInterface $channel, Message $message) use ($name, &$consumer) { $this->client->getChannel(ChannelInterface::META_CONNECT)->subscribe($consumer); - $channel = $this->client->getChannel('/topic/Accounts'); + $channel = $this->client->getChannel(getenv('AccountPushTopic')); $channel->subscribe( Consumer::create( function (ChannelInterface $channel, Message $message) use ($name) { - $this->assertEquals("/topic/Accounts", $channel->getChannelId()); + $this->assertEquals(getenv('AccountPushTopic'), $channel->getChannelId()); $data = $message->getData(); $this->assertNotNull($data); $sobject = $data->getSobject(); @@ -111,6 +116,11 @@ function (ChannelInterface $channel, Message $message) use ($name) { public function testCdc() { + if (getenv('AccountChangeEvent') == '') { + $this->fail('Account push topic not named in .env.'); + return; + } + $rand = rand(100, 1000); $name = 'Test Account '.$rand; @@ -139,11 +149,11 @@ function (ChannelInterface $channel, Message $message) use ($name, &$consumer) { $this->client->getChannel(ChannelInterface::META_CONNECT)->subscribe($consumer); - $channel = $this->client->getChannel('/data/AccountChangeEvent'); + $channel = $this->client->getChannel(getenv('AccountChangeEvent')); $channel->subscribe( Consumer::create( function (ChannelInterface $channel, Message $message) use ($name) { - $this->assertEquals("/data/AccountChangeEvent", $channel->getChannelId()); + $this->assertEquals(getenv('AccountChangeEvent'), $channel->getChannelId()); $data = $message->getData(); $this->assertNotNull($data); $payload = $data->getPayload(); diff --git a/phpunit.xml.dist b/phpunit.xml.dist index f131ca8..b1fc29a 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -15,10 +15,14 @@ --> + + + + Tests - \ No newline at end of file + diff --git a/src/Bayeux/BayeuxClient.php b/src/Bayeux/BayeuxClient.php index 79abc04..e2defd0 100644 --- a/src/Bayeux/BayeuxClient.php +++ b/src/Bayeux/BayeuxClient.php @@ -10,6 +10,7 @@ use AE\SalesforceRestSdk\AuthProvider\AuthProviderInterface; use AE\SalesforceRestSdk\AuthProvider\SessionExpiredOrInvalidException; +use AE\SalesforceRestSdk\Bayeux\BayeuxClientState as State; use AE\SalesforceRestSdk\Bayeux\Extension\ExtensionInterface; use AE\SalesforceRestSdk\Bayeux\Transport\AbstractClientTransport; use AE\SalesforceRestSdk\Bayeux\Transport\HttpClientTransport; @@ -83,9 +84,12 @@ class BayeuxClient private $clientId; /** - * @var string + * @var State\ClientState */ - private $state = self::DISCONNECTED; + private $state; + + /** @var array|string[] */ + private $previousStates = []; /** * @var ArrayCollection|ChannelInterface[] @@ -157,6 +161,23 @@ protected function createClient(string $version = "44.0") ); } + public function start() + { + $this->setState(new State\HandshakeState($this, $this->logger)); + do { + $this->state->handle(); + } while (!$this->state instanceof State\DisconnectState); + } + + public function setState(State\ClientState $state) + { + if (array_search(get_class($state), $this->previousStates) === false) { + $state->init(); + $this->previousStates[] = get_class($state); + } + $this->state = $state; + } + /** * @param string $channelId * @@ -170,290 +191,45 @@ public function getChannel(string $channelId): ChannelInterface $channel = new Channel($channelId); - // If you wanna listen to meta messages, be my guest but we don't need to subscribe to them - if ($channel->isMeta()) { - $this->channels->set($channelId, $channel); - - return $channel; - } - - $this->getChannel(ChannelInterface::META_SUBSCRIBE)->subscribe( - Consumer::create( - function (ChannelInterface $c, Message $message) { - if (!$message->isSuccessful()) { - $this->logger->error( - "Failed to subscribe to channel {channel}", - [ - 'channel' => $c->getChannelId(), - ] - ); - - $c->unsubscribeAll(); - $this->channels->remove($c->getChannelId()); - } - } - ) - ) - ; - $this->channels->set($channelId, $channel); - if (null !== $this->clientId) { - $message = new Message(); - $message->setChannel(ChannelInterface::META_SUBSCRIBE); - $message->setSubscription($channelId); - $this->sendMessages([$message]); - } else { - // If the handshake hasn't taken place, wait for it and connect in bulk - $this->getChannel(ChannelInterface::META_HANDSHAKE)->subscribe( - Consumer::create( - function (ChannelInterface $c, Message $m) use ($channel) { - if ($m->isSuccessful() && !$channel->isMeta()) { - $message = new Message(); - $message->setChannel(ChannelInterface::META_SUBSCRIBE); - $message->setSubscription($channel->getChannelId()); - - $this->sendMessages([$message]); - } - } - ) - ) - ; - } - return $channel; } - /** - * Start the Bayeux Client - * - * @code getChannel('/topic/mytopic'); - * $channel->subscribe(function(ChannelInterface $c, StreamingData $data) { - * ///... - * }); - * $client->start(); - */ - public function start(): void - { - if (!$this->isDisconnected()) { - throw new \RuntimeException("The client must be disconnected before starting."); - } - - $this->getChannel(ChannelInterface::META_HANDSHAKE)->subscribe( - Consumer::create( - function ( - ChannelInterface $c, - Message $message - ) { - if ($message->isSuccessful()) { - $this->listen(); - } else { - $this->logger->critical("Handshake authentication failed with the server."); - throw new \RuntimeException("Handshake authentication failed with the server."); - } - }, - 1000000 - ) - ) - ; - - $this->handshake(); - } - public function isDisconnected(): bool { - return in_array( - $this->state, - [static::DISCONNECTED, static::DISCONNECTING, static::UNCONNECTED, static::TERMINATING] - ); + return $this->state instanceof State\DisconnectState; } public function handshake(): void { - if (in_array( - $this->state, - [ - static::CONNECTING, - static::CONNECTED, - static::HANDSHAKING, - static::HANDSHAKEN, - static::TERMINATING, - ] - )) { - $this->logger->critical("The client must be fully disconnected before handshaking."); - throw new \RuntimeException("The client must be fully disconnected before handshaking."); - } - - if ($this->state !== static::REHANDSHAKING) { - $this->state = static::HANDSHAKING; - } - - $consumer = Consumer::create( - function (ChannelInterface $c, Message $message) use (&$consumer) { - $c->unsubscribe($consumer); - - if ($message->isSuccessful()) { - $this->state = static::HANDSHAKEN; - - return true; - } else { - $advice = $message->getAdvice(); - $this->clientId = null; - - if (null !== $advice && $advice->getReconnect() === 'retry') { - $this->state = static::REHANDSHAKING; - sleep($advice->getInterval() ?: 0); - - $this->handshake(); - } else { - $this->state = static::UNCONNECTED; - } - - return false; - } - }, - -1000 - ); - $this->getChannel(ChannelInterface::META_HANDSHAKE)->subscribe($consumer); - - $message = new Message(); - $message->setChannel(ChannelInterface::META_HANDSHAKE); - $message->setSupportedConnectionTypes([$this->transport->getName()]); - $message->setVersion(static::VERSION); - $message->setMinimumVersion(static::MINIMUM_VERSION); - - $this->sendMessages([$message]); + $this->setState(new State\HandshakeState($this, $this->logger)); + $this->state->handle(); } public function connect(): void { - if ($this->state !== static::HANDSHAKEN && $this->state !== static::CONNECTED) { - $this->logger->critical("Cannot connect to the server without first handshaking with it."); - throw new \RuntimeException("Cannot connect to the server without first handshaking with it."); - } - - $this->state = static::CONNECTING; - - $message = new Message(); - $message->setChannel(ChannelInterface::META_CONNECT); - $message->setConnectionType($this->transport->getName()); - - $consumer = Consumer::create( - function (ChannelInterface $c, Message $message) use (&$consumer) { - $c->unsubscribe($consumer); - - if ($message->isSuccessful()) { - $this->state = static::CONNECTED; - } else { - $this->logger->critical( - 'Failed to connect with Salesforce: {error}', - [ - 'error' => $message->getError(), - ] - ); - - $advice = $message->getAdvice(); - - if (null !== $advice && in_array($advice->getReconnect(), ['retry', 'handshake'])) { - $interval = $advice->getInterval() ?: 0; - sleep($interval); - - if ($advice->getReconnect() === 'retry') { - $this->connect(); - } else { - $this->state = static::REHANDSHAKING; - $this->handshake(); - } - } else { - $this->disconnect(); - } - } - }, - -1000 - ); - $channel = $this->getChannel(ChannelInterface::META_CONNECT); - $channel->subscribe($consumer); - - $this->sendMessages([$message]); + $this->setState(new State\ConnectState($this, $this->logger)); + $this->state->handle(); } + /** + * @deprecated use connect instead. + */ public function listen(): void { - if ($this->state !== static::HANDSHAKEN) { - $this->logger->critical("A handshake connection with the streaming service must occur before listening."); - throw new \RuntimeException( - "A handshake connection with the streaming service must occur before listening." - ); - } - - $channel = $this->getChannel(ChannelInterface::META_CONNECT); - $channel->subscribe( - Consumer::create( - function (ChannelInterface $c, Message $message) { - if ($message->isSuccessful()) { - $advice = $message->getAdvice(); - if (null !== $advice && $advice->getInterval() > 0) { - sleep($advice->getInterval()); - } - - $this->connect(); - } - }, - 1000000 - ) - ); - $this->connect(); } public function disconnect(): void { - if (!in_array($this->state, [static::CONNECTING, static::CONNECTED, static::TERMINATING])) { - $this->logger->notice("The server must be connected before disconnecting."); - throw new \RuntimeException("The server must be connected before disconnecting."); - } - - if ($this->state !== static::TERMINATING) { - $this->state = static::DISCONNECTING; - } - - $unsubscribes = []; - - foreach ($this->channels as $channel) { - if ($channel->isMeta()) { - continue; - } - - $message = new Message(); - $message->setChannel(ChannelInterface::META_UNSUBSCRIBE); - $message->setSubscription($channel->getChannelId()); - $unsubscribes[] = $message; - } - - // Unsubscribe channels - if (count($unsubscribes) > 0) { - $this->sendMessages($unsubscribes); - } - - $message = new Message(); - $message->setChannel(ChannelInterface::META_DISCONNECT); - - $this->sendMessages([$message]); - $this->authProvider->revoke(); - $this->clientId = null; - // Clear all the channels to make room for new subscriptions - $this->channels->clear(); - - $this->state = static::DISCONNECTED; + $this->setState(new State\DisconnectState($this, $this->logger)); + $this->state->handle(); } public function terminate(): void { - $this->state = static::TERMINATING; $this->transport->terminate(); - $this->disconnect(); } @@ -602,6 +378,11 @@ public function getClientId(): string return $this->clientId; } + public function clearClientId() + { + $this->clientId = null; + } + /** * @return string */ diff --git a/src/Bayeux/BayeuxClientState/ClientState.php b/src/Bayeux/BayeuxClientState/ClientState.php new file mode 100644 index 0000000..439bb15 --- /dev/null +++ b/src/Bayeux/BayeuxClientState/ClientState.php @@ -0,0 +1,49 @@ +context = $context; + $this->logger = $logger; + } + + public function getContext(): BayeuxClient + { + return $this->context; + } + + /** + * @param $nextState + * @return mixed + */ + public function transitionToState(string $nextState) : bool + { + switch ($nextState) { + case HandshakeState::class: + case SubscribeState::class: + case UnsubscribeState::class: + case ConnectState::class: + case DisconnectState::class: + $this->getContext()->setState(new $nextState($this->context, $this->logger)); + return true; + } + throw new InvalidArgumentException('Class ' . $nextState . ' is not a valid state.'); + } +} diff --git a/src/Bayeux/BayeuxClientState/ConnectState.php b/src/Bayeux/BayeuxClientState/ConnectState.php new file mode 100644 index 0000000..a5695fb --- /dev/null +++ b/src/Bayeux/BayeuxClientState/ConnectState.php @@ -0,0 +1,59 @@ +getContext()->getChannel(ChannelInterface::META_CONNECT)->subscribe( + Consumer::create( + function (ChannelInterface $c, Message $message) { + + $advice = $message->getAdvice(); + + if (null !== $advice && $advice->getInterval() > 0) { + sleep($advice->getInterval()); + } + + if (!$message->isSuccessful()) { + $this->logger->critical( + 'Failed to connect with Salesforce: {error}', + [ + 'error' => $message->getError(), + ] + ); + + if (null !== $advice && in_array($advice->getReconnect(), ['retry', 'handshake'])) { + + if ($advice->getReconnect() === 'retry') { + //SF wants us to retry, so we just stay in this state + return; + } else { + //We got banished to HANDSHAKING again, lets transition + $this->transitionToState(HandshakeState::class); + } + } else { + // We got owned, give up and disconnect + $this->transitionToState(DisconnectState::class); + } + } + //Any time we successfully connect we will want to just connect again, so no need to transition states here either. + }, + -1000 + ) + ); + } + + public function handle() + { + $message = new Message(); + $message->setChannel(ChannelInterface::META_CONNECT); + $message->setConnectionType($this->getContext()->getTransport()->getName()); + $this->getContext()->sendMessages([$message]); + } +} diff --git a/src/Bayeux/BayeuxClientState/DisconnectState.php b/src/Bayeux/BayeuxClientState/DisconnectState.php new file mode 100644 index 0000000..8fc0df8 --- /dev/null +++ b/src/Bayeux/BayeuxClientState/DisconnectState.php @@ -0,0 +1,30 @@ +getContext(), $this->logger); + $unsubscribe->handle(); + + $message = new Message(); + $message->setChannel(ChannelInterface::META_DISCONNECT); + + $this->getContext()->sendMessages([$message]); + $this->getContext()->getAuthProvider()->revoke(); + $this->getContext()->clearClientId(); + // Clear all the channels to make room for new subscriptions + $this->getContext()->getChannels()->clear(); + } +} diff --git a/src/Bayeux/BayeuxClientState/HandshakeState.php b/src/Bayeux/BayeuxClientState/HandshakeState.php new file mode 100644 index 0000000..2bd11fd --- /dev/null +++ b/src/Bayeux/BayeuxClientState/HandshakeState.php @@ -0,0 +1,52 @@ +getContext()->getChannel(ChannelInterface::META_HANDSHAKE)->subscribe( + Consumer::create( + function ( + ChannelInterface $c, + Message $message + ) { + if ($message->isSuccessful()) { + // If we successfully handshake, we will transition to the Subscribe state. + $this->transitionToState(SubscribeState::class); + } else { + $this->logger->critical("Handshake authentication failed with the server."); + $advice = $message->getAdvice(); + $this->getContext()->clearClientId(); + if (null !== $advice && $advice->getReconnect() === 'retry') { + sleep($advice->getInterval() ?: 0); + // We won't make any state transitions so the Handshake step will re run once we exit + return; + } + // lol we dead + $this->transitionToState(DisconnectState::class); + } + }, + 1000000 + ) + ) + ; + } + + public function handle() + { + $message = new Message(); + $message->setChannel(ChannelInterface::META_HANDSHAKE); + $message->setSupportedConnectionTypes([$this->getContext()->getTransport()->getName()]); + $message->setVersion(BayeuxClient::VERSION); + $message->setMinimumVersion(BayeuxClient::MINIMUM_VERSION); + + $this->getContext()->sendMessages([$message]); + } +} diff --git a/src/Bayeux/BayeuxClientState/SubscribeState.php b/src/Bayeux/BayeuxClientState/SubscribeState.php new file mode 100644 index 0000000..49c33af --- /dev/null +++ b/src/Bayeux/BayeuxClientState/SubscribeState.php @@ -0,0 +1,44 @@ +getContext()->getChannel(ChannelInterface::META_SUBSCRIBE)->subscribe( + Consumer::create( + function (ChannelInterface $c, Message $message) { + if (!$message->isSuccessful()) { + $this->logger->error( + "Failed to subscribe to channel {channel}", + [ + 'channel' => $c->getChannelId(), + ] + ); + } + } + ) + ) + ; + } + + public function handle() + { + foreach ($this->getContext()->getChannels() as $channel) { + /** @var ChannelInterface $channel */ + if ($channel->isMeta()) { + continue; + } + $message = new Message(); + $message->setChannel(ChannelInterface::META_SUBSCRIBE); + $message->setSubscription($channel->getChannelId()); + $this->getContext()->sendMessages([$message]); + } + $this->transitionToState(ConnectState::class); + } +} diff --git a/src/Bayeux/BayeuxClientState/UnsubscribeState.php b/src/Bayeux/BayeuxClientState/UnsubscribeState.php new file mode 100644 index 0000000..ab13dc8 --- /dev/null +++ b/src/Bayeux/BayeuxClientState/UnsubscribeState.php @@ -0,0 +1,36 @@ +getContext()->getChannels() as $channel) { + if ($channel->isMeta()) { + continue; + } + + $message = new Message(); + $message->setChannel(ChannelInterface::META_UNSUBSCRIBE); + $message->setSubscription($channel->getChannelId()); + $unsubscribes[] = $message; + } + + // Unsubscribe channels + if (count($unsubscribes) > 0) { + $this->getContext()->sendMessages($unsubscribes); + } + } +}