Skip to content

Commit

Permalink
[#18]: feat: raw payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored May 17, 2022
2 parents 495c1f7 + 32670e2 commit 4520b8f
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 135 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Linux

on:
push:
branches:
- master
tags-ignore:
- "**"
jobs:
golang:
name: Build (Go ${{ matrix.go }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
fail-fast: true
matrix:
go: [ "1.18" ]
os: [ "ubuntu-latest" ]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v3 # action page: <https://github.com/actions/setup-go>
with:
go-version: ${{ matrix.go }}

- name: Check out code
uses: actions/checkout@v3

- name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules>
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-

- name: Install Go dependencies
run: go mod download

- name: Run golang tests with coverage
run: make test
119 changes: 1 addition & 118 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,118 +1 @@
### AMQP Driver

Strictly speaking, AMQP (and 0.9.1 version used) is a protocol, not a full-fledged driver, so you can use
any servers that support this protocol (on your own, only rabbitmq was tested) , such as:
[RabbitMQ](https://www.rabbitmq.com/), [Apache Qpid](http://qpid.apache.org/) or
[Apache ActiveMQ](http://activemq.apache.org/). However, it is recommended to
use RabbitMQ as the main implementation, and reliable performance with other
implementations is not guaranteed.

To install and configure the RabbitMQ, use the corresponding
[documentation page](https://www.rabbitmq.com/download.html). After that, you
should configure the connection to the server in the "`amqp`" section. This
configuration section contains exactly one `addr` key with a
[connection DSN](https://www.rabbitmq.com/uri-spec.html).

```yaml
amqp:
addr: amqp://guest:guest@localhost:5672
```
After creating a connection to the server, you can create a new queue that will
use this connection and which will contain the queue settings (including
amqp-specific):
```yaml
amqp:
addr: amqp://guest:guest@localhost:5672


jobs:
pipelines:
# User defined name of the queue.
example:
# Required section.
# Should be "amqp" for the AMQP driver.
driver: amqp

# Optional section.
# Default: 10
priority: 10

# Optional section.
# Default: 100
prefetch: 100

# Optional section.
# Default: "default"
queue: "default"

# Optional section.
# Default: "amqp.default"
exchange: "amqp.default"

# Optional section.
# Default: "direct"
exchange_type: "direct"

# Optional section.
# Default: "" (empty)
routing_key: ""

# Optional section.
# Default: false
exclusive: false

# Optional section.
# Default: false
multiple_ack: false

# Optional section.
# Default: false
requeue_on_fail: false
```
Below is a more detailed description of each of the amqp-specific options:
- `priority` - Queue default priority for for each task pushed into this queue
if the priority value for these tasks was not explicitly set.

- `prefetch` - The client can request that messages be sent in advance so that
when the client finishes processing a message, the following message is
already held locally, rather than needing to be sent down the channel.
Prefetching gives a performance improvement. This field specifies the prefetch
window size in octets. See also ["prefetch-size"](https://www.rabbitmq.com/amqp-0-9-1-reference.html)
in AMQP QoS documentation reference.

- `queue` - AMQP internal (inside the driver) queue name.

- `exchange` - The name of AMQP exchange to which tasks are sent. Exchange
distributes the tasks to one or more queues. It routes tasks to the queue
based on the created bindings between it and the queue. See also
["AMQP model"](https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model)
documentation section.

- `exchange_type` - The type of task delivery. May be one of `direct`, `topics`,
`headers` or `fanout`.
- `direct` - Used when a task needs to be delivered to specific queues. The
task is published to an exchanger with a specific routing key and goes to
all queues that are associated with this exchanger with a similar routing
key.
- `topics` - Similarly, `direct` exchange enables selective routing by
comparing the routing key. But, in this case, the key is set using a
template, like: `user.*.messages`.
- `fanout` - All tasks are delivered to all queues even if a routing key is
specified in the task.
- `headers` - Routes tasks to related queues based on a comparison of the
(key, value) pairs of the headers property of the binding and the similar
property of the message.

- `routing_key` - Queue's routing key.

- `exclusive` - Exclusive queues can't be redeclared. If set to true and
you'll try to declare the same pipeline twice, that will lead to an error.

- `multiple_ack` - This delivery and all prior unacknowledged deliveries on
the same channel will be acknowledged. This is useful for batch processing
of deliveries. Applicable only for the Ack, not for the Nack.

- `requeue_on_fail` - Requeue on Nack.
# Docs: [link](https://roadrunner.dev/docs/plugins-jobs/2.x/en)
2 changes: 2 additions & 0 deletions amqpjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
exchangeType string = "exchange_type"
queue string = "queue"
routingKey string = "routing_key"
consumeAll string = "consume_all"
prefetch string = "prefetch"
exclusive string = "exclusive"
durable string = "durable"
Expand Down Expand Up @@ -34,6 +35,7 @@ type config struct {
Exchange string `mapstructure:"exchange"`
ExchangeType string `mapstructure:"exchange_type"`
RoutingKey string `mapstructure:"routing_key"`
ConsumeAll bool `mapstructure:"consume_all"`
Exclusive bool `mapstructure:"exclusive"`
Durable bool `mapstructure:"durable"`
DeleteQueueOnStop bool `mapstructure:"delete_queue_on_stop"`
Expand Down
20 changes: 11 additions & 9 deletions amqpjobs/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const (

type Consumer struct {
sync.Mutex
log *zap.Logger
pq priorityqueue.Queue

pipeline atomic.Value
log *zap.Logger
pq priorityqueue.Queue
pipeline atomic.Value
consumeAll bool

// amqp connection notifiers
notifyCloseConnCh chan *amqp.Error
Expand Down Expand Up @@ -94,11 +94,12 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer
// PARSE CONFIGURATION END -------

jb := &Consumer{
log: log,
pq: pq,
consumeID: uuid.NewString(),
stopCh: make(chan struct{}, 1),
// TODO to config
log: log,
pq: pq,
consumeID: uuid.NewString(),
stopCh: make(chan struct{}, 1),
consumeAll: conf.ConsumeAll,

retryTimeout: time.Minute,
priority: conf.Priority,
delayed: utils.Int64(0),
Expand Down Expand Up @@ -197,6 +198,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co
notifyCloseStatCh: make(chan *amqp.Error, 1),
notifyClosePubCh: make(chan *amqp.Error, 1),

consumeAll: pipeline.Bool(consumeAll, false),
routingKey: pipeline.String(routingKey, ""),
queue: pipeline.String(queue, "default"),
exchangeType: pipeline.String(exchangeType, "direct"),
Expand Down
54 changes: 54 additions & 0 deletions amqpjobs/conv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package amqpjobs

import (
"strconv"

amqp "github.com/rabbitmq/amqp091-go"
)

func convHeaders(h amqp.Table) map[string][]string { //nolint:gocyclo
ret := make(map[string][]string)
for k := range h {
switch t := h[k].(type) {
case int:
ret[k] = []string{strconv.Itoa(t)}
case int8:
ret[k] = []string{strconv.Itoa(int(t))}
case int16:
ret[k] = []string{strconv.Itoa(int(t))}
case int32:
ret[k] = []string{strconv.Itoa(int(t))}
case int64:
ret[k] = []string{strconv.Itoa(int(t))}
case uint:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint8:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint16:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint32:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint64:
ret[k] = []string{strconv.FormatUint(t, 10)}
case float32:
ret[k] = []string{strconv.FormatFloat(float64(t), 'f', 5, 64)}
case float64:
ret[k] = []string{strconv.FormatFloat(t, 'f', 5, 64)}
case string:
ret[k] = []string{t}
case []string:
ret[k] = t
case bool:
switch t {
case true:
ret[k] = []string{"true"}
case false:
ret[k] = []string{"false"}
}
case []byte:
ret[k] = []string{string(t)}
}
}

return ret
}
59 changes: 59 additions & 0 deletions amqpjobs/conv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package amqpjobs

import (
"testing"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
)

func TestConv(t *testing.T) {
table := amqp.Table{}
table["a"] = 1
table["b"] = int8(1)
table["c"] = int16(1)
table["d"] = int32(1)
table["e"] = int64(1)

table["f"] = uint(1)
table["j"] = uint8(1)
table["i"] = uint16(1)
table["g"] = uint32(1)
table["k"] = uint64(1)

table["strsl"] = []string{"a", "b", "c"}
table["str"] = "fooooo"

table["bf"] = false
table["bt"] = true

table["bytes"] = []byte("fooooooobbbbbb")

table["foo"] = float32(2.3)
table["foo2"] = 2.3

ret := convHeaders(table)

require.Equal(t, ret["foo"], []string{"2.30000"})
require.Equal(t, ret["foo2"], []string{"2.30000"})

require.Equal(t, ret["a"], []string{"1"})
require.Equal(t, ret["b"], []string{"1"})
require.Equal(t, ret["c"], []string{"1"})
require.Equal(t, ret["d"], []string{"1"})
require.Equal(t, ret["e"], []string{"1"})

require.Equal(t, ret["f"], []string{"1"})
require.Equal(t, ret["j"], []string{"1"})
require.Equal(t, ret["i"], []string{"1"})
require.Equal(t, ret["g"], []string{"1"})
require.Equal(t, ret["k"], []string{"1"})

require.Equal(t, ret["strsl"], []string{"a", "b", "c"})
require.Equal(t, ret["str"], []string{"fooooo"})

require.Equal(t, ret["bf"], []string{"false"})
require.Equal(t, ret["bt"], []string{"true"})

require.Equal(t, ret["bytes"], []string{"fooooooobbbbbb"})
}
Loading

0 comments on commit 4520b8f

Please sign in to comment.