Skip to content

Commit

Permalink
update publisher to accept dest exchange id
Browse files Browse the repository at this point in the history
  • Loading branch information
msawka committed Jul 8, 2015
1 parent 6993e01 commit 4b91465
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
34 changes: 27 additions & 7 deletions lib/overseer_api/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,22 @@ defmodule OpenAperture.OverseerApi.Publisher do
GenServer.cast(__MODULE__, {:publish_event, event})
end

def publish_request(request) do
GenServer.cast(__MODULE__, {:publish_request, request})
@doc """
Method to publish Requests to an Overseer in a specific exchange
## Option Values
The `request` options represents the Request to publish
The `dest_exchange_id` represents the exchange in which the Overseer should exist
## Return Value
:ok
"""
@spec publish_request(Request.t, term) :: :ok
def publish_request(request, dest_exchange_id) do
GenServer.cast(__MODULE__, {:publish_request, request, dest_exchange_id})
end

@doc """
Expand Down Expand Up @@ -96,16 +110,22 @@ defmodule OpenAperture.OverseerApi.Publisher do
GenServer callback for handling the :publish_request event. This method
will publish requests to the Overseer system module
## Option Values
The `request` option defines the Request to be processed
The `dest_exchange_id` represents the exchange in which the Overseer should exist
{:noreply, state}
"""
@spec handle_cast({:publish_request, Request.t}, Map) :: {:noreply, Map}
def handle_cast({:publish_request, request}, state) do
Logger.debug("#{@logprefix} Publishing request to Overseer...")
@spec handle_cast({:publish_request, Request.t, String.t}, Map) :: {:noreply, Map}
def handle_cast({:publish_request, request, dest_exchange_id}, state) do
Logger.debug("#{@logprefix} Publishing request to Overseer in exchange #{inspect dest_exchange_id}...")

payload = Request.to_payload(request)

options = ConnectionOptionsResolver.get_for_broker(ManagerApi.get_api, state[:broker_id])
queue = QueueBuilder.build(ManagerApi.get_api, "overseer", state[:exchange_id])
options = ConnectionOptionsResolver.resolve(ManagerApi.get_api, state[:broker_id], state[:exchange_id], dest_exchange_id)
queue = QueueBuilder.build(ManagerApi.get_api, "overseer", dest_exchange_id)

case publish(options, queue, payload) do
:ok -> Logger.debug("[#{@logprefix} Successfully published request to Overseer")
Expand Down
4 changes: 2 additions & 2 deletions test/overseer_api/publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule OpenAperture.OverseerApi.PublisherTest do
:meck.expect(QueueBuilder, :build, fn _,_,_ -> %OpenAperture.Messaging.Queue{name: ""} end)

:meck.new(ConnectionOptionsResolver, [:passthrough])
:meck.expect(ConnectionOptionsResolver, :get_for_broker, fn _, _ -> %AMQPConnectionOptions{} end)
:meck.expect(ConnectionOptionsResolver, :resolve, fn _,_,_,_ -> %AMQPConnectionOptions{} end)

state = %{
}
Expand All @@ -66,7 +66,7 @@ defmodule OpenAperture.OverseerApi.PublisherTest do
action: :upgrade_request,
options: %{force: true}
}
assert Publisher.handle_cast({:publish_request, request}, state) == {:noreply, state}
assert Publisher.handle_cast({:publish_request, request, 1}, state) == {:noreply, state}
after
:meck.unload(ConnectionPool)
:meck.unload(ConnectionPools)
Expand Down

0 comments on commit 4b91465

Please sign in to comment.