Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1965 from tomaszdurka/issue-1965
Browse files Browse the repository at this point in the history
Add cm-janus service
  • Loading branch information
tomaszdurka committed Nov 6, 2015
2 parents 44c206a + 2813864 commit d0725df
Show file tree
Hide file tree
Showing 13 changed files with 612 additions and 0 deletions.
57 changes: 57 additions & 0 deletions library/CM/Janus/Configuration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

class CM_Janus_Configuration {

/** @var CM_Janus_Server[] */
protected $_servers;

/**
* @param array|null $servers
*/
public function __construct(array $servers = null) {
foreach ((array) $servers as $server) {
$this->addServer($server);
}
}

/**
* @param CM_Janus_Server $server
*/
public function addServer(CM_Janus_Server $server) {
$this->_servers[] = $server;
}

/**
* @return CM_Janus_Server[]
*/
public function getServers() {
return $this->_servers;
}

/**
* @param string $key
* @return CM_Janus_Server|null
*/
public function findServerByKey($key) {
foreach ($this->_servers as $server) {
if ($server->getKey() === $key) {
return $server;
}
}
return null;
}

/**
* @param int $id
* @return CM_Janus_Server
* @throws CM_Exception_Invalid
*/
public function getServer($id) {
foreach ($this->_servers as $server) {
if ($server->getId() === $id) {
return $server;
}
}
throw new CM_Exception_Invalid('Cannot find server with id `' . $id . '`');
}
}
25 changes: 25 additions & 0 deletions library/CM/Janus/Factory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

class CM_Janus_Factory {

/**
* @param array $servers
* @return CM_Janus_Service
*/
public function createService(array $servers) {
$configuration = new CM_Janus_Configuration();
foreach ($servers as $serverId => $serverConfig) {
$configuration->addServer(new CM_Janus_Server(
$serverId,
$serverConfig['key'],
$serverConfig['httpAddress'],
$serverConfig['webSocketAddress']
));
}

$httpClient = new GuzzleHttp\Client();
$httpApiClient = new CM_Janus_HttpApiClient($httpClient);
$janus = new CM_Janus_Service($configuration, $httpApiClient);
return $janus;
}
}
58 changes: 58 additions & 0 deletions library/CM/Janus/HttpApiClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

class CM_Janus_HttpApiClient {

/** @var \GuzzleHttp\Client */
protected $_httpClient;

/**
* @param \GuzzleHttp\Client $httpClient
*/
public function __construct(GuzzleHttp\Client $httpClient) {
$this->_httpClient = $httpClient;
}

/**
* @param CM_Janus_Server $server
* @param string $clientKey
* @return string
* @throws CM_Exception_Invalid
*/
public function stopStream(CM_Janus_Server $server, $clientKey) {
return $this->_request('POST', $server, '/stopStream', ['streamId' => (string) $clientKey]);
}

/**
* @param CM_Janus_Server $server
* @return array
* @throws CM_Exception_Invalid
*/
public function fetchStatus(CM_Janus_Server $server) {
$encodedStatus = $this->_request('GET', $server, '/status');
return CM_Params::jsonDecode($encodedStatus);
}

/**
* @param string $method
* @param CM_Janus_Server $server
* @param string $path
* @param array|null $body
* @return string
* @throws CM_Exception_Invalid
*/
protected function _request($method, CM_Janus_Server $server, $path, array $body = null) {
$url = $server->getHttpAddress() . $path;
$body = (array) $body;
$options = [
'body' => $body,
'headers' => ['Server-Key' => $server->getKey()],
];
$request = $this->_httpClient->createRequest($method, $url, $options);
try {
$response = $this->_httpClient->send($request);
} catch (GuzzleHttp\Exception\TransferException $e) {
throw new CM_Exception_Invalid('Fetching contents from `' . $url . '` failed: `' . $e->getMessage());
}
return $response->getBody();
}
}
115 changes: 115 additions & 0 deletions library/CM/Janus/RpcEndpoints.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?php

class CM_Janus_RpcEndpoints {

/**
* @param string $serverKey
* @param string $streamChannelKey
* @param string $streamKey
* @param int $start
* @param string $data
* @return array
* @throws CM_Exception_AuthFailed
* @throws CM_Exception_AuthRequired
* @throws CM_Exception_Invalid
* @throws CM_Exception_NotAllowed
* @throws Exception
*/
public static function rpc_publish($serverKey, $streamChannelKey, $streamKey, $start, $data) {
$janus = CM_Service_Manager::getInstance()->getJanus('janus');
self::_authenticate($janus, $serverKey);

$params = CM_Params::factory(CM_Params::jsonDecode($data), true);
$server = $janus->getConfiguration()->findServerByKey($serverKey);

if (!$server) {
throw new CM_Exception_Invalid('Server `' . $serverKey . '` not found');
}

$streamChannelType = $params->getInt('streamChannelType');
$session = new CM_Session($params->getString('sessionId'));
$user = $session->getUser(true);

$streamRepository = $janus->getStreamRepository();
$streamChannel = $streamRepository->createStreamChannel($streamChannelKey, $streamChannelType, $server->getId(), 0);
try {
$streamRepository->createStreamPublish($streamChannel, $user, $streamKey, $start);
} catch (CM_Exception_NotAllowed $ex) {
$streamChannel->delete();
throw $ex;
}
return ['streamChannelId' => $streamChannel->getId()];
}

/**
* @param string $serverKey
* @param string $streamChannelKey
* @param string $streamKey
* @param string $start
* @param string $data
* @return bool
* @throws CM_Exception_AuthFailed
* @throws CM_Exception_AuthRequired
* @throws CM_Exception_Invalid
* @throws CM_Exception_Nonexistent
* @throws CM_Exception_NotAllowed
*/
public static function rpc_subscribe($serverKey, $streamChannelKey, $streamKey, $start, $data) {

$janus = CM_Service_Manager::getInstance()->getJanus('janus');
self::_authenticate($janus, $serverKey);

$params = CM_Params::factory(CM_Params::jsonDecode($data), true);
$session = new CM_Session($params->getString('sessionId'));
$user = $session->getUser(true);

$streamRepository = $janus->getStreamRepository();
$streamChannel = $streamRepository->findStreamChannelByKey($streamChannelKey);
if (!$streamChannel) {
throw new CM_Exception_Nonexistent("Stream channel `{$streamChannelKey}` does not exists");
}
try {
$streamRepository->createStreamSubscribe($streamChannel, $user, $streamKey, $start);
} catch (CM_Exception_NotAllowed $exception) {
throw new CM_Exception_NotAllowed('Cannot subscribe: ' . $exception->getMessage());
}
return true;
}

/**
* @param string $serverKey
* @param string $streamChannelKey
* @param string $streamKey
* @return bool
* @throws CM_Exception_AuthFailed
* @throws CM_Exception_Invalid
*/
public static function rpc_removeStream($serverKey, $streamChannelKey, $streamKey) {
$janus = CM_Service_Manager::getInstance()->getJanus('janus');
self::_authenticate($janus, $serverKey);

$streamRepository = $janus->getStreamRepository();
$streamChannel = $streamRepository->findStreamChannelByKey($streamChannelKey);

$streamSubscribe = $streamChannel->getStreamSubscribes()->findKey($streamKey);
if ($streamSubscribe) {
$streamRepository->removeStream($streamSubscribe);
}
$streamPublish = $streamChannel->getStreamPublishs()->findKey($streamKey);
if ($streamPublish) {
$streamRepository->removeStream($streamSubscribe);
}
return true;
}

/**
* @param CM_Janus_Service $janus
* @param string $serverKey
* @throws CM_Exception_AuthFailed
*/
protected static function _authenticate(CM_Janus_Service $janus, $serverKey) {
if (!$janus->getConfiguration()->findServerByKey($serverKey)) {
throw new CM_Exception_AuthFailed('Invalid serverKey');
}
}
}
57 changes: 57 additions & 0 deletions library/CM/Janus/Server.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

class CM_Janus_Server {

/** @var int */
protected $_id;

/** @var string */
protected $_httpAddress;

/** @var string */
protected $_webSocketAddress;

/** @var string */
protected $_key;

/**
* @param int $serverId
* @param string $key
* @param string $httpAddress
* @param string $webSocketAddress
*/
public function __construct($serverId, $key, $httpAddress, $webSocketAddress) {
$this->_id = (int) $serverId;
$this->_key = (string) $key;
$this->_httpAddress = (string) $httpAddress;
$this->_webSocketAddress = (string) $webSocketAddress;
}

/**
* @return int
*/
public function getId() {
return $this->_id;
}

/**
* @return string
*/
public function getKey() {
return $this->_key;
}

/**
* @return string
*/
public function getHttpAddress() {
return $this->_httpAddress;
}

/**
* @return string
*/
public function getWebSocketAddress() {
return $this->_webSocketAddress;
}
}
61 changes: 61 additions & 0 deletions library/CM/Janus/Service.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

class CM_Janus_Service extends CM_MediaStreams_Service {

/** @var CM_Janus_Configuration */
protected $_configuration;

/** @var CM_Janus_HttpApiClient */
protected $_httpApiClient;

/**
* @param CM_Janus_Configuration $configuration
* @param CM_Janus_HttpApiClient $httpClient
* @param CM_MediaStreams_StreamRepository|null $streamRepository
*/
public function __construct(CM_Janus_Configuration $configuration, CM_Janus_HttpApiClient $httpClient, CM_MediaStreams_StreamRepository $streamRepository = null) {
$this->_configuration = $configuration;
$this->_httpApiClient = $httpClient;
parent::__construct($streamRepository);
}

public function synchronize() {
throw new CM_Exception_NotImplemented();
}

/**
* @return CM_Janus_Configuration
*/
public function getConfiguration() {
return $this->_configuration;
}

/**
* @return CM_Janus_Stream[]
* @throws CM_Exception_Invalid
*/
protected function _fetchStatus() {
$status = [];
foreach ($this->_configuration->getServers() as $server) {
foreach ($this->_httpApiClient->fetchStatus($server) as $streamInfo) {
$status[] = new CM_Janus_Stream($streamInfo['streamKey'], $streamInfo['streamChannelKey'], $server);
}
}
return $status;
}

/**
* @param CM_Model_Stream_Abstract $stream
* @throws CM_Exception_Invalid
* @throws CM_Janus_StopStreamError
*/
protected function _stopStream(CM_Model_Stream_Abstract $stream) {
/** @var $streamChannel CM_Model_StreamChannel_Media */
$streamChannel = $stream->getStreamChannel();
$server = $this->_configuration->getServer($streamChannel->getServerId());
$result = $this->_httpApiClient->stopStream($server, $stream->getKey());
if (array_key_exists('error', $result)) {
throw new CM_Janus_StopStreamError($result['error']);
}
}
}
5 changes: 5 additions & 0 deletions library/CM/Janus/StopStreamError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?php

class CM_Janus_StopStreamError extends CM_Exception {

}
Loading

0 comments on commit d0725df

Please sign in to comment.