Skip to content

Commit

Permalink
add Elixir source for tutorial 6
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffweiss committed Jan 8, 2016
1 parent 5757dfd commit 5c7c726
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
36 changes: 36 additions & 0 deletions elixir/rpc_client.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule FibonacciRpcClient do
def wait_for_messages(_channel, correlation_id) do
receive do
{:basic_deliver, payload, %{correlation_id: ^correlation_id}} ->
{n, _} = Integer.parse(payload)
n
end
end
def call(n) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
AMQP.Basic.consume(channel, queue_name, nil, no_ack: true)
correlation_id = :erlang.unique_integer |> :erlang.integer_to_binary |> Base.encode64
request = to_string(n)
AMQP.Basic.publish(channel, "", "rpc_queue", request, reply_to: queue_name, correlation_id: correlation_id)

FibonacciRpcClient.wait_for_messages(channel, correlation_id)
end
end

num =
case System.argv do
[] -> 30
param ->
{x, _} =
param
|> Enum.join(" ")
|> Integer.parse
x
end

IO.puts " [x] Requesting fib(#{num})"
response = FibonacciRpcClient.call(num)
IO.puts " [.] Got #{response}"
33 changes: 33 additions & 0 deletions elixir/rpc_server.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule FibServer do
def fib(0), do: 0
def fib(1), do: 1
def fib(n) when n > 1, do: fib(n-1) + fib(n-2)

def wait_for_messages(channel) do
receive do
{:basic_deliver, payload, meta} ->
{n, _} = Integer.parse(payload)
IO.puts " [.] fib(#{n})"
response = fib(n)

AMQP.Basic.publish(channel, "", meta.reply_to, "#{response}", correlation_id: meta.correlation_id)
AMQP.Basic.ack(channel, meta.delivery_tag)

wait_for_messages(channel)
end
end
end

{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)

AMQP.Queue.declare(channel, "rpc_queue")

AMQP.Basic.qos(channel, prefetch_count: 1)

AMQP.Basic.consume(channel, "rpc_queue")

IO.puts " [x] Awaiting RPC requests"

FibServer.wait_for_messages(channel)

0 comments on commit 5c7c726

Please sign in to comment.