Skip to content

Commit

Permalink
Merge branch 'add_elixir_tutorials' of https://github.com/jeffweiss/r…
Browse files Browse the repository at this point in the history
…abbitmq-tutorials into jeffweiss-add_elixir_tutorials
  • Loading branch information
michaelklishin committed Jan 9, 2016
2 parents 55374bd + 5c7c726 commit 7ba330d
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 0 deletions.
4 changes: 4 additions & 0 deletions elixir/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.mix_tasks
mix.lock
/_build
/deps
14 changes: 14 additions & 0 deletions elixir/emit_log.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

message =
case System.argv do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end

AMQP.Exchange.declare(channel, "logs", :fanout)
AMQP.Basic.publish(channel, "logs", "", message)
IO.puts " [x] Sent '#{message}'"

AMQP.Connection.close(connection)
28 changes: 28 additions & 0 deletions elixir/emit_log_direct.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

{severities, raw_message, _} =
System.argv
|> OptionParser.parse(strict: [info: :boolean,
warning: :boolean,
error: :boolean])
|> case do
{[], msg, _} -> {[info: true], msg, []}
other -> other
end

message =
case raw_message do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end

AMQP.Exchange.declare(channel, "direct_logs", :direct)

for {severity, true} <- severities do
severity = severity |> to_string
AMQP.Basic.publish(channel, "direct_logs", severity, message)
IO.puts " [x] Sent '[#{severity}] #{message}'"
end

AMQP.Connection.close(connection)
17 changes: 17 additions & 0 deletions elixir/emit_log_topic.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

{topic, message} =
System.argv
|> case do
[] -> {"anonymous.info", "Hello World!"}
[message] -> {"anonymous.info", message}
[topic|words] -> {topic, Enum.join(words, " ")}
end

AMQP.Exchange.declare(channel, "topic_logs", :topic)

AMQP.Basic.publish(channel, "topic_logs", topic, message)
IO.puts " [x] Sent '[#{topic}] #{message}'"

AMQP.Connection.close(connection)
34 changes: 34 additions & 0 deletions elixir/mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule RabbitmqTutorials.Mixfile do
use Mix.Project

def project do
[app: :rabbitmq_tutorials,
version: "0.0.1",
elixir: "~> 1.1",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps]
end

# Configuration for the OTP application
#
# Type "mix help compile.app" for more information
def application do
[applications: [:logger, :amqp]]
end

# Dependencies can be Hex packages:
#
# {:mydep, "~> 0.3.0"}
#
# Or git/path repositories:
#
# {:mydep, git: "https://github.com/elixir-lang/mydep.git", tag: "0.1.0"}
#
# Type "mix help deps" for more examples and options
defp deps do
[
{:amqp, "~> 0.1.4"},
]
end
end
15 changes: 15 additions & 0 deletions elixir/new_task.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "task_queue", durable: true)

message =
case System.argv do
[] -> "Hello World!"
words -> Enum.join(words, " ")
end

AMQP.Basic.publish(channel, "", "task_queue", message, persistent: true)
IO.puts " [x] Sent '#{message}'"

AMQP.Connection.close(connection)
17 changes: 17 additions & 0 deletions elixir/receive.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Receive do
def wait_for_messages do
receive do
{:basic_deliver, payload, _meta} ->
IO.puts " [x] Received #{payload}"
wait_for_messages
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "hello")
AMQP.Basic.consume(channel, "hello", nil, no_ack: true)
IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"

Receive.wait_for_messages
21 changes: 21 additions & 0 deletions elixir/receive_logs.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule ReceiveLogs do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, _meta} ->
IO.puts " [x] Received #{payload}"

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Exchange.declare(channel, "logs", :fanout)
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
AMQP.Queue.bind(channel, queue_name, "logs")
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"

ReceiveLogs.wait_for_messages(channel)
35 changes: 35 additions & 0 deletions elixir/receive_logs_direct.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule ReceiveLogsDirect do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received [#{meta.routing_key}] #{payload}"

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

{severities, _, _} =
System.argv
|> OptionParser.parse(strict: [info: :boolean,
warning: :boolean,
error: :boolean])

AMQP.Exchange.declare(channel, "direct_logs", :direct)

{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)

for {severity, true} <- severities do
binding_key = severity |> to_string
AMQP.Queue.bind(channel, queue_name, "direct_logs", routing_key: binding_key)
end

AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)

IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C"


ReceiveLogsDirect.wait_for_messages(channel)
31 changes: 31 additions & 0 deletions elixir/receive_logs_topic.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule ReceiveLogsTopic do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received [#{meta.routing_key}] #{payload}"

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Exchange.declare(channel, "topic_logs", :topic)

{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)

if length(System.argv) == 0 do
IO.puts "Usage: mix run receive_logs_topic.exs [binding_key]..."
System.halt(1)
end
for binding_key <- System.argv do
AMQP.Queue.bind(channel, queue_name, "topic_logs", routing_key: binding_key)
end

AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)

IO.puts " [*] Waiting for messages. To exist press CTRL+C, CTRL+C"

ReceiveLogsTopic.wait_for_messages(channel)
36 changes: 36 additions & 0 deletions elixir/rpc_client.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule FibonacciRpcClient do
def wait_for_messages(_channel, correlation_id) do
receive do
{:basic_deliver, payload, %{correlation_id: ^correlation_id}} ->
{n, _} = Integer.parse(payload)
n
end
end
def call(n) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
correlation_id = :erlang.unique_integer |> :erlang.integer_to_binary |> Base.encode64
request = to_string(n)
AMQP.Basic.publish(channel, "", "rpc_queue", request, reply_to: queue_name, correlation_id: correlation_id)

FibonacciRpcClient.wait_for_messages(channel, correlation_id)
end
end

num =
case System.argv do
[] -> 30
param ->
{x, _} =
param
|> Enum.join(" ")
|> Integer.parse
x
end

IO.puts " [x] Requesting fib(#{num})"
response = FibonacciRpcClient.call(num)
IO.puts " [.] Got #{response}"
33 changes: 33 additions & 0 deletions elixir/rpc_server.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule FibServer do
def fib(0), do: 0
def fib(1), do: 1
def fib(n) when n > 1, do: fib(n-1) + fib(n-2)

def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
{n, _} = Integer.parse(payload)
IO.puts " [.] fib(#{n})"
response = fib(n)

AMQP.Basic.publish(channel, "", meta.reply_to, "#{response}", correlation_id: meta.correlation_id)
AMQP.Basic.ack(channel, meta.delivery_tag)

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "rpc_queue")

AMQP.Basic.qos(channel, prefetch_count: 1)

AMQP.Basic.consume(channel, "rpc_queue")

IO.puts " [x] Awaiting RPC requests"

FibServer.wait_for_messages(channel)

6 changes: 6 additions & 0 deletions elixir/send.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "hello")
AMQP.Basic.publish(channel, "", "hello", "Hello World!")
IO.puts " [x] Sent 'Hello World!'"
AMQP.Connection.close(connection)
28 changes: 28 additions & 0 deletions elixir/worker.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Worker do
def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts " [x] Received #{payload}"
payload
|> to_char_list
|> Enum.count(fn x -> x == ?. end)
|> Kernel.*(1000)
|> :timer.sleep
IO.puts " [x] Done."
AMQP.Basic.ack(channel, meta.delivery_tag)

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "task_queue", durable: true)
AMQP.Basic.qos(channel, prefetch_count: 1)

AMQP.Basic.consume(channel, "hello")
IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"

Worker.wait_for_messages(channel)

0 comments on commit 7ba330d

Please sign in to comment.