Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

💥 add opentracing support to Remit #84

Merged
merged 13 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# remit

[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=master)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=master)](https://coveralls.io/github/jpwilliams/remit?branch=master) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit?ref=badge_shield)
[![Build Status](https://travis-ci.org/jpwilliams/remit.svg?branch=master)](https://travis-ci.org/jpwilliams/remit) [![Coverage Status](https://coveralls.io/repos/github/jpwilliams/remit/badge.svg?branch=master)](https://coveralls.io/github/jpwilliams/remit?branch=master) [![npm downloads per month](https://img.shields.io/npm/dm/remit.svg)](https://www.npmjs.com/package/remit) [![npm version](https://img.shields.io/npm/v/remit.svg)](https://www.npmjs.com/package/remit) [![OpenTracing Badge](https://img.shields.io/badge/OpenTracing-enabled-blue.svg)](http://opentracing.io) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fjpwilliams%2Fremit?ref=badge_shield)

A wrapper for RabbitMQ for communication between microservices. No service discovery needed.

Expand Down
65 changes: 65 additions & 0 deletions docs/_guide/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,68 @@ layout: default
title: Tracing
order: 6
---
# OpenTracing

![Jaeger Tracing](https://user-images.githubusercontent.com/1736957/41066405-9bf0808e-69d9-11e8-8d2a-b4704ca2731a.png)

Remit supports [OpenTracing](https://opentracing.io)-compatible tracers, pushing data to any compatible backend. More information on the OpenTracing API can be found at [https://opentracing.io](https://opentracing.io).

Officially supported tracers that provide Node.js clients are currently:

- [Jaeger](https://www.jaegertracing.io)
- [LightStep](http://lightstep.com/)
- [Instana](https://www.instana.com/)
- [Datadog](https://www.datadoghq.com/apm/)

# Adding a tracer

Using a tracer with Remit is exceedingly simple. When instantiating Remit, simply pass in a `tracer` option. The example below uses the popular [jaegertracing/jaeger-client-node](https://github.com/jaegertracing/jaeger-client-node).

{% highlight js %}
const Remit = require('remit')
const { initTracer } = require('jaeger-client')
const serviceName = 'my-traced-service'

// most tracers allow some configuration to choose when to
// take trace samples. This config will ensure traces
// are always created.
const tracer = initTracer({
serviceName,
sampler: {
type: 'const',
param: 1
}
})

const remit = Remit({
name: serviceName,
tracer
})
{% endhighlight %}

All calls for this Remit instance will now be traced! Great!

# Namespaces

If attempting to trace multiple libraries/frameworks, you'll need to have them cooperating with each-other to make relevant traces. While the method to perform this [hasn't yet been nailed down](https://github.com/opentracing/specification/issues/23), Remit will provide a solution that's most likely in line with the resulting OpenTracing specification changes.

We currently use [jeff-lewis/cls-hooked](https://github.com/jeff-lewis/cls-hooked) to infer span contexts between Remit calls. This has worked well even previous to the introduction of OpenTracing, so we'll use it again here.

Remit allows you to pass in a `namespace` upon instantiation, so you can have `get`/`set` access to the namespace providing the relevant contexts. If you don't know how these contexts work, I strongly suggest you read the [jeff-lewis/cls-hooked](https://github.com/jeff-lewis/cls-hooked) docs and get a grip on namespaces and contexts before use.

{% highlight js %}
const Remit = require('remit')
const { Tracer } = require('opentracing')
const cls = require('cls-hooked')

const tracer = new Tracer()
const namespace = cls.createNamespace('tracing')
const remit = Remit({ namespace, tracer })

// Internally, Remit uses the 'context' key to store the current
// span context, so set this to update it.
const span = tracer.startSpan('my-http-request')
namespace.set('context', span.context())
{% endhighlight %}

This `namespace` API is currently seen as _experimental_ and __will change without a major version bump upon the OpenTracing specificaton decision__.
34 changes: 22 additions & 12 deletions lib/Emitter.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const bubble = require('../utils/bubble')
const opentracing = require('opentracing')
const CallableInstance = require('callable-instance')
const EventEmitter = require('eventemitter3')
const genUuid = require('../utils/genUuid')
Expand Down Expand Up @@ -53,11 +53,11 @@ class Emitter extends CallableInstance {
args.push(undefined)
}

if (!bubble.active) args[2] = true
if (!this._remit._namespace.active) args[2] = true

return bubble.active
return this._remit._namespace.active
? this._send(...args)
: bubble.runAndReturn(this._send.bind(this, ...args))
: this._remit._namespace.runAndReturn(this._send.bind(this, ...args))
}

async _send (data = null, opts = {}, extendedCapture = false) {
Expand All @@ -68,21 +68,14 @@ class Emitter extends CallableInstance {
const parsedOptions = this._generateOptions(opts)
const messageId = genUuid()

const originId = bubble.get('originId') || messageId
const bubbleId = bubble.get('bubbleId') || null
bubble.set('originId', originId)
bubble.set('bubbleId', bubbleId)

const message = {
mandatory: false,
messageId: messageId,
appId: this._remit._options.name,
timestamp: now,
headers: {
trace: getStackLine.parse(callsites),
originId: originId,
bubbleId: bubbleId,
fromBubbleId: bubbleId
context: {}
},
persistent: true
}
Expand All @@ -107,6 +100,22 @@ class Emitter extends CallableInstance {
parsedData = 'null'
}

const parentContext = this._remit._namespace.get('context')

const span = this._remit._tracer.startSpan(`Remit Emit: ${parsedOptions.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: parsedOptions.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_PRODUCER,
'data.outgoing': data
},
childOf: parentContext
})

this._remit._tracer.inject(span.context(), opentracing.FORMAT_TEXT_MAP, message.headers.context)

const demitQueue = await this._setupDemitQueue(parsedOptions, now)
const worker = await this._remit._workers.acquire()

Expand Down Expand Up @@ -136,6 +145,7 @@ class Emitter extends CallableInstance {
}

this._remit._workers.release(worker)
span.finish()

// We do this to make room for multiple emits.
// without this, continued synchronous emissions
Expand Down
49 changes: 30 additions & 19 deletions lib/Endpoint.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const EventEmitter = require('eventemitter3')
const bubble = require('../utils/bubble')
const { ulid } = require('ulid')
const opentracing = require('opentracing')
const parseEvent = require('../utils/parseEvent')
const waterfall = require('../utils/asyncWaterfall')
const serializeData = require('../utils/serializeData')
Expand Down Expand Up @@ -113,7 +112,7 @@ class Endpoint {
try {
consumeResult = await this._consumer.consume(
this._options.queue,
bubble.bind(this._incoming.bind(this)),
this._remit._namespace.bind(this._incoming.bind(this)),
{
noAck: true,
exclusive: false
Expand Down Expand Up @@ -149,13 +148,23 @@ class Endpoint {
return
}

if (message.properties.headers) {
bubble.set('originId', message.properties.headers.originId)
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid())
}
const parentContext = this._remit._tracer.extract(opentracing.FORMAT_TEXT_MAP, (message.properties.headers && message.properties.headers.context) || {}) || null

const span = this._remit._tracer.startSpan(`Remit Endpoint: ${this._options.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: this._options.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_RPC_SERVER,
'data.incoming': data
},
childOf: parentContext
})

this._remit._namespace.set('context', span.context())

const event = parseEvent(message.properties, message.fields, data, {
flowType: 'entry',
isReceiver: true
})

Expand All @@ -169,27 +178,29 @@ class Endpoint {

const canReply = Boolean(message.properties.replyTo)

if (!canReply) {
await resultOp
let finalData = await resultOp
const [ resErr, resData ] = finalData

if (resErr) {
span.setTag(opentracing.Tags.ERROR, true)
span.setTag('data.outgoing', resErr)
} else {
let finalData = await resultOp
span.setTag('data.outgoing', resData)
}

span.finish()

// if a cold pause has been requested, don't process this
if (this._cold) return
// if a cold pause has been requested, don't process this
if (this._cold) return

if (canReply) {
finalData = serializeData(finalData)

const worker = await this
._remit
._workers
.acquire()

if (message.properties.headers) {
message.properties.headers.originId = message.properties.headers.originId || bubble.get('originId')
message.properties.headers.fromBubbleId = message.properties.headers.bubbleId
message.properties.headers.bubbleId = bubble.get('bubbleId')
}

try {
await worker.sendToQueue(
message.properties.replyTo,
Expand Down
26 changes: 18 additions & 8 deletions lib/Listener.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const bubble = require('../utils/bubble')
const { ulid } = require('ulid')
const opentracing = require('opentracing')
const EventEmitter = require('eventemitter3')
const parseEvent = require('../utils/parseEvent')
const waterfall = require('../utils/asyncWaterfall')
Expand Down Expand Up @@ -112,7 +111,7 @@ class Listener {
try {
consumeResult = await this._consumer.consume(
this._consumerQueueName,
bubble.bind(this._incoming.bind(this)),
this._remit._namespace.bind(this._incoming.bind(this)),
{
noAck: shouldSubscribe,
exclusive: shouldSubscribe
Expand Down Expand Up @@ -149,13 +148,23 @@ class Listener {
return
}

if (message.properties.headers) {
bubble.set('originId', message.properties.headers.originId)
if (!bubble.get('bubbleId')) bubble.set('bubbleId', ulid())
}
const parentContext = this._remit._tracer.extract(opentracing.FORMAT_TEXT_MAP, (message.properties.headers && message.properties.headers.context) || {}) || null

const span = this._remit._tracer.startSpan(`Remit Listener: ${this._options.event}`, {
tags: {
'remit.version': this._remit.version,
[opentracing.Tags.SAMPLING_PRIORITY]: 1,
[opentracing.Tags.COMPONENT]: 'remit',
[opentracing.Tags.MESSAGE_BUS_DESTINATION]: this._options.event,
[opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_CONSUMER,
'data.incoming': data
},
references: [opentracing.followsFrom(parentContext)]
})

this._remit._namespace.set('context', span.context())

const event = parseEvent(message.properties, message.fields, data, {
flowType: 'entry',
isReceiver: true
})

Expand All @@ -168,6 +177,7 @@ class Listener {
}

await resultOp
span.finish()

// if a cold pause has been requested, don't process this
if (this._cold) return
Expand Down
14 changes: 13 additions & 1 deletion lib/Remit.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const url = require('url')
const amqplib = require('amqplib')
const { Tracer } = require('opentracing')
const EventEmitter = require('eventemitter3')
const packageJson = require('../package.json')
const parseAmqpUrl = require('../utils/parseAmqpUrl')
Expand All @@ -11,6 +12,7 @@ const Endpoint = require('./Endpoint')
const Listener = require('./Listener')
const Request = require('./Request')
const Emitter = require('./Emitter')
const { createNamespace } = require('cls-hooked')

class Remit {
constructor (options = {}) {
Expand All @@ -24,14 +26,24 @@ class Remit {
this._options = {}

this._options.exchange = options.exchange || 'remit'
this._options.name = options.name || process.env.REMIT_NAME || ''
this._options.name = options.name || process.env.REMIT_NAME || 'remit'
this._options.url = options.url || process.env.REMIT_URL || 'amqp://localhost'

this._emitter = new EventEmitter()
this._connection = this._connect(this._options).catch(throwAsException)
this._workers = ChannelPool(this._connection)
this._publishChannels = {}

this._tracer = options.tracer || new Tracer({
serviceName: this._options.name,
reporter: {
jpwilliams marked this conversation as resolved.
Show resolved Hide resolved
logSpans: true,
flushIntervalMs: 10
}
})

this._namespace = options.namespace || createNamespace('remit')

// TODO make this better
this._eventCounters = {}
}
Expand Down
Loading