Skip to content

Commit

Permalink
3.1 enhancesments and message handling refinement
Browse files Browse the repository at this point in the history
- More message properties and attributes now supported
- Body/Message encoding is now configurable at runtime at construction
  - Supported Encodings:
    - base64
    - utf-8
  - This provides and future entry point for mesage encryption support
- routing_key now works
- added new message properties:
  - correlation_id
  - reply_to
  • Loading branch information
zircote committed Dec 8, 2013
1 parent 7dc7b86 commit c7e5935
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 80 deletions.
18 changes: 18 additions & 0 deletions docs/example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"body": "eyJleHBpcmVzIjogbnVsbCwgInV0YyI6IHRydWUsICJhcmdzIjogWzIsIDJdLCAiY2hvcmQiOiBudWxsLCAiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgInRhc2tzZXQiOiBudWxsLCAiaWQiOiAiNjJlODMzMTYtMTg1Ny00N2ZmLTkyNzYtNmQ5MzRiZjhhNjBlIiwgInJldHJpZXMiOiAwLCAidGFzayI6ICJwcmVkaXMuYWRkIiwgInRpbWVsaW1pdCI6IFtudWxsLCBudWxsXSwgImV0YSI6IG51bGwsICJrd2FyZ3MiOiB7fX0=",
"headers": {},
"content-type": "application/json",
"properties": {
"body_encoding": "base64",
"correlation_id": "62e83316-1857-47ff-9276-6d934bf8a60e",
"reply_to": "d6422edb-425a-3b86-86d8-fb1112a10a54",
"delivery_info": {
"priority": 0,
"routing_key": "celery",
"exchange": "celery"
},
"delivery_mode": 2,
"delivery_tag": 2
},
"content-encoding": "utf-8"
}
18 changes: 18 additions & 0 deletions docs/notworking.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"body": "ddd==",
"headers": [],
"content_type": "application/json",
"properties": {
"correlation_id": "6cca25f2-5fa8-11e3-904c-e4ce8f2261f0",
"reply_to": "6cca25f2-5fa8-11e3-904c-e4ce8f2261f0",
"body_encoding": "base64",
"name": "celery",
"delivery_info": {
"priority": 0,
"routing_key": "celery",
"exchange": "celery"
},
"durable": true,
"delivery_mode": 2,
"delivery_tag": 2
}}
34 changes: 17 additions & 17 deletions library/Rhubarb/Broker/Amqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,42 @@ class Amqp extends AmqpConnector implements BrokerInterface
*/
public function publishTask(\Rhubarb\Task $task)
{
if (!$task->getMessage()->getPropRoutingKey()) {
$task->getMessage()->setPropRoutingKey($task->getId());
}
$channel = $this->getConnection()->channel();
$channel->queueDeclare(
array(
'queue' => $task->message->getQueue(),
'durable' => true,
'auto_delete' => false,
'queue' => $task->getMessage()->getQueue(),
'durable' => $task->getMessage()->getPropDurable(),
'auto_delete' => $task->getMessage()->getPropAutoDelete(),
'arguments' => $task->getMessage()->getPropQueueArgs()
)
);

$channel->exchangeDeclare(
$task->message->getPropExchange(),
$task->getMessage()->getPropExchange(),
'direct',
array('passive' => true, 'durable' => true)
);

$channel->queueBind(
$task->getMessage()->getQueue(),
$task->message->getPropExchange(),
array('routing_key' => $task->getId())
$task->getMessage()->getPropExchange(),
array('routing_key' => $task->getMessage()->getPropRoutingKey())
);

$msgProperties = array('content_type' => Rhubarb::RHUBARB_CONTENT_TYPE);
if($task->getPriority()){
$msgProperties['priority'] = $task->getPriority();
}
$msgProperties = array(
'content_type' => Rhubarb::RHUBARB_CONTENT_TYPE,
'content_encoding' => $task->getMessage()->getContentEncoding(),
'priority' => $task->getMessage()->getPropPriority()
);

$task->message->setPropBodyEncoding(null);

$taskArray = $task->toArray();

$channel->basicPublish(
new AmqpMessage(json_encode($taskArray['body']), $msgProperties),
new AmqpMessage((string) $task, $msgProperties),
array(
'exchange' => $task->message->getPropExchange(),
'routing_key' => $task->getId()
'exchange' => $task->getMessage()->getPropExchange(),
'routing_key' => $task->getMessage()->getPropRoutingKey()
)
);

Expand Down
8 changes: 4 additions & 4 deletions library/Rhubarb/Broker/PhpAmqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ public function publishTask(\Rhubarb\Task $task)
$queue->bind($task->message->getPropExchange(), $task->getId());

$msgProperties = array(
'content_type' => $task->message->getContentType()
'content_type' => $task->getMessage()->getContentType(),
'content_encoding' => $task->getMessage()->getContentEncoding(),
'encoding' => $task->getMessage()->getContentEncoding()
);

if ($task->getPriority()) {
$msgProperties['priority'] = $task->getPriority();
}
$task->message->setPropBodyEncoding(null);
$taskArray = $task->toArray();
$exchange->publish(
json_encode($taskArray['body']),
(string) $task,
$task->getId(),
AMQP_NOPARAM,
$msgProperties
Expand Down
16 changes: 14 additions & 2 deletions library/Rhubarb/Broker/Predis.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,19 @@ class Predis extends PredisConnection implements BrokerInterface
*/
public function publishTask(\Rhubarb\Task $task)
{
$task->getMessage()->setPropBodyEncoding(Message::BODY_ENCODING_BASE64);
$this->getConnection()->lpush($task->getMessage()->getPropExchange(), (string) $task);
$task->getMessage()->setContentEncoding(Rhubarb::CONTENT_ENCODING_UTF8);
if (!$task->getMessage()->getPropRoutingKey()) {
$task->getMessage()->setPropRoutingKey('celery');
}
if (!$task->getMessage()->getCorrelationId()){
$task->getMessage()->setCorrelationId($task->getId());
}
if (!$task->getMessage()->getReplyTo()){
$task->getMessage()->setReplyTo($task->getId());
}
$task->getMessage()->setPropDeliveryMode(2)->setPropDeliveryTag(2);
$task->getMessage()->setBodyEncoding(Rhubarb::CONTENT_ENCODING_BASE64);
$task->toArray();
$this->getConnection()->lpush($task->getMessage()->getPropExchange(), (string) $task->getMessage());
}
}
2 changes: 1 addition & 1 deletion library/Rhubarb/Broker/Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public function publishTask(\Rhubarb\Task $task)
{

$taskArray = $task->toArray();
$this->published = json_encode($taskArray['body']);
$this->published = json_encode($taskArray);
}

public function getPublishedValues()
Expand Down
82 changes: 44 additions & 38 deletions library/Rhubarb/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@
*/
class Message
{
const BODY_ENCODING_BASE64 = 'base64';

/**
* @return array
*/
public function getMessage()
{
return $this->message;
}

/**
* @var array
*/
Expand All @@ -53,10 +43,13 @@ public function getMessage()
'body' => null,
'headers' => array(),
'content-type' => Rhubarb::RHUBARB_CONTENT_TYPE,
'content-encoding' => Rhubarb::RHUBARB_DEFAULT_CONTENT_ENCODING,
'properties' => array(
'correlation_id' => null,
'reply_to' => null,
'body_encoding' => null,
'exclusive' => null,
'name' => Rhubarb::RHUBARB_DEFAULT_TASK_QUEUE,
'body_encoding' => null,
'delivery_info' => array(
'priority' => 0,
'routing_key' => null,
Expand All @@ -69,9 +62,8 @@ public function getMessage()
'queue_arguments' => array(),
'binding_arguments' => array(),
'delivery_tag' => null,
'auto_delete' => null,
),
'content-encoding' => Rhubarb::RHUBARB_DEFAULT_CONTENT_ENCODING
'auto_delete' => false,
)
);

/**
Expand All @@ -82,6 +74,22 @@ public function getMessage()
public function __construct()
{
}
public function setReplyTo($reply_to)
{
$this->message['properties']['reply_to'] = $reply_to;
}
public function getReplyTo()
{
return $this->message['properties']['reply_to'];
}
public function setCorrelationId($correlation_id)
{
$this->message['properties']['correlation_id'] = $correlation_id;
}
public function getCorrelationId()
{
return $this->message['properties']['correlation_id'];
}
/**
*
* @param string $queue
Expand Down Expand Up @@ -281,24 +289,6 @@ public function getPropDeliveryMode()
return $this->message['properties']['delivery_mode'];
}

/**
* @param string $propBodyEncoding
* @return self
*/
public function setPropBodyEncoding($propBodyEncoding)
{
$this->message['properties']['body_encoding'] = $propBodyEncoding;
return $this;
}

/**
* @return string
*/
public function getPropBodyEncoding()
{
return $this->message['properties']['body_encoding'];
}

/**
* @param array $propBindingArgs
* @return self
Expand Down Expand Up @@ -389,6 +379,24 @@ public function getContentType()
return $this->message['content-type'];
}

/**
* @param string $contentEncoding
* @return self
*/
public function setBodyEncoding($contentEncoding)
{
$this->message['properties']['body_encoding'] = $contentEncoding;
return $this;
}

/**
* @return string
*/
public function getBodyEncoding()
{
return $this->message['properties']['body_encoding'];
}

/**
* @param string $contentEncoding
* @return self
Expand Down Expand Up @@ -430,11 +438,9 @@ public function getBody()
*/
public function toArray()
{
$message = $this->message;
switch (strtolower($message['properties']['body_encoding'])) {
case self::BODY_ENCODING_BASE64:
$message['body'] = base64_encode(json_encode($message['body']));
break;
$message = $this->message;
if (isset($message['properties'])) {
$message['properties'] = array_filter($message['properties']);
}
return $message;
}
Expand All @@ -445,6 +451,6 @@ public function toArray()
public function __toString()
{
$message = $this->toArray();
return json_encode($message);
return json_encode($message, JSON_UNESCAPED_SLASHES);
}
}
8 changes: 6 additions & 2 deletions library/Rhubarb/Rhubarb.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ class Rhubarb
/**
* @var string
*/
const RHUBARB_DEFAULT_CONTENT_ENCODING = 'utf-8';
const CONTENT_ENCODING_BASE64 = 'base64';
/**
* @var string
*/
const RHUBARB_DEFAULT_BODY_ENCODING = 'base64';
const CONTENT_ENCODING_UTF8 = 'utf-8';
/**
* @var string
*/
const RHUBARB_DEFAULT_CONTENT_ENCODING = null;
/**
* @var string
*/
Expand Down
48 changes: 35 additions & 13 deletions library/Rhubarb/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,14 @@ public function __construct($name, $args = array(), Rhubarb $rhubarb, $id = null
* - exchange: Name of exchange (or a kombu.entity.Exchange) to send the message to.
*
* @param array $options
*
* @throws \RuntimeException
* @return Task
*/
public function delay($options = array())
{
if ($this->getTaskSent()) {
throw new \RuntimeException('Task has been sent');
}
if (isset($options['countdown'])) {
$this->setCountdown($options['countdown']);
}
Expand All @@ -159,6 +162,16 @@ public function delay($options = array())
}
if (isset($options['priority'])) {
$this->setPriority($options['priority']);
$this->getMessage()->setPropPriority($options['priority']);
}
if (isset($options['no_ack'])) {
$this->getMessage()->setPropNoAck($options['no_ack']);
}
if (isset($options['delivery_tag'])) {
$this->getMessage()->setPropDeliveryTag($options['delivery_tag']);
}
if (isset($options['delivery_mode'])) {
$this->getMessage()->setPropDeliveryMode($options['delivery_mode']);
}
if (isset($options['utc'])) {
$this->setUtc((bool)$options['utc']);
Expand All @@ -175,12 +188,15 @@ public function delay($options = array())
if (isset($options['queue_args'])) {
$this->message->setPropQueueArgs($options['queue_args']);
}
if (isset($options['routing_key'])) {
$this->message->setPropRoutingKey($options['routing_key']);
}
if (isset($options['exchange'])) {
$this->message->setPropExchange($options['exchange']);
} else {
$this->message->setPropExchange(Rhubarb::RHUBARB_DEFAULT_EXCHANGE_NAME);
}
$this->taskSent = true;
$this->setTaskSent(true);
$this->getRhubarb()->getBroker()->publishTask($this);
return $this;
}
Expand Down Expand Up @@ -471,15 +487,6 @@ public function get($timeout = 10, $interval = 0.5)
return $this->responseBody->result;
}

/**
* @return string
*/
public function __toString()
{
$encodedJson = json_encode($this->toArray());
return (string)$encodedJson;
}

/**
*
* @param int $countdown
Expand Down Expand Up @@ -573,8 +580,23 @@ public function toArray()
'eta' => ($this->eta instanceof \DateTime) ? $this->eta->format(\DateTime::ISO8601) : null,
'errbacks' => $this->errbacks
);
$this->message->setBody($body);
return $this->message->toArray();
$encoding = ($this->getMessage()->getBodyEncoding() || $this->getMessage()->getContentEncoding());
switch ($encoding) {
case Rhubarb::CONTENT_ENCODING_BASE64:
$body = base64_encode(json_encode($body, JSON_UNESCAPED_SLASHES));
break;
}
$this->getMessage()->setBody($body);
return $body;
}

/**
* @return string
*/
public function __toString()
{
$encodedJson = json_encode($this->toArray(), JSON_UNESCAPED_SLASHES);
return (string)$encodedJson;
}

/**
Expand Down
Loading

0 comments on commit c7e5935

Please sign in to comment.