Skip to content

Commit

Permalink
Merge pull request #121 from agrare/fix_object_type_id_argument_error
Browse files Browse the repository at this point in the history
Add object_type,_id to WorkflowInstance#run_queue

(cherry picked from commit 8cacfab)
  • Loading branch information
Fryguy committed Nov 19, 2024
1 parent 44d7fcf commit 71307dd
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
class ManageIQ::Providers::Workflows::AutomationManager::WorkflowInstance < ManageIQ::Providers::EmbeddedAutomationManager::ConfigurationScript
def run_queue(zone: nil, role: "automate", object: nil, deliver_on: nil, server_guid: nil)
def run_queue(zone: nil, role: "automate", object: nil, object_type: nil, object_id: nil, deliver_on: nil, server_guid: nil)
args = {:zone => zone, :role => role}
if object
args[:object_type] = object.class.name
args[:object_id] = object.id
elsif object_type && object_id
args[:object_type] = object_type
args[:object_id] = object_id
end

queue_opts = {
Expand Down
32 changes: 15 additions & 17 deletions lib/manageiq/providers/workflows/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def initialize

def start
$workflows_log.debug("Runner: Starting workflows runner...")
self.docker_wait_thread = Thread.new { docker_wait }
self.docker_wait_thread = Thread.new { loop { docker_wait } }
$workflows_log.debug("Runner: Starting workflows runner...Complete")
end

Expand All @@ -47,23 +47,21 @@ def delete_workflow_instance(workflow_instance)
attr_accessor :docker_wait_thread

def docker_wait
loop do
docker_runner = Floe::Runner.for_resource("docker")
docker_runner.wait do |event, data|
execution_id, runner_context = data.values_at("execution_id", "runner_context")
$workflows_log.debug("Runner: Caught event [#{event}] for workflow [#{execution_id}] container [#{runner_context["container_ref"]}]")

workflow_instance, queue_args = workflow_instances[execution_id]
next if workflow_instance.nil?

$workflows_log.debug("Runner: Queueing update for WorkflowInstance ID: [#{workflow_instance.id}]")

workflow_instance.run_queue(**queue_args)
end
rescue => err
$workflows_log.warn("Error: [#{err}]")
$workflows_log.log_backtrace(err)
docker_runner = Floe::Runner.for_resource("docker")
docker_runner.wait do |event, data|
execution_id, runner_context = data.values_at("execution_id", "runner_context")
$workflows_log.debug("Runner: Caught event [#{event}] for workflow [#{execution_id}] container [#{runner_context["container_ref"]}]")

workflow_instance, queue_args = workflow_instances[execution_id]
next if workflow_instance.nil?

$workflows_log.debug("Runner: Queueing update for WorkflowInstance ID: [#{workflow_instance.id}]")

workflow_instance.run_queue(**queue_args)
end
rescue => err
$workflows_log.warn("Error: [#{err}]")
$workflows_log.log_backtrace(err)
end

def stop_thread(thread)
Expand Down
54 changes: 54 additions & 0 deletions spec/lib/manageiq/providers/workflows/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,58 @@
end
end
end

describe "#docker_wait (private)" do
let(:docker_runner) { double("Floe::ContainerRunner::Docker") }
let(:execution_id) { SecureRandom.uuid }
let(:container_ref) { SecureRandom.uuid }
let(:event) { "create" }
let(:data) { {"execution_id" => execution_id, "runner_context" => {"container_ref" => container_ref}} }

before do
allow(Floe::Runner).to receive(:for_resource).with("docker").and_return(docker_runner)
allow(docker_runner).to receive(:wait).and_yield(event, data)
end

context "with no workflows in #workflow_instances" do
it "doesn't queue an update for an unrecognized workflow" do
subject.send(:docker_wait)

expect(MiqQueue.count).to be_zero
end
end

context "with a workflow_instance registered with this runner" do
let(:workflow_instance) { FactoryBot.create(:workflows_automation_workflow_instance, :manager_ref => execution_id) }
let(:queue_args) { {:zone => nil, :role => "automate"} }
before { subject.add_workflow_instance(workflow_instance, queue_args) }

it "queues a run for the workflow instance" do
subject.send(:docker_wait)

expect(MiqQueue.count).to eq(1)
expect(MiqQueue.first).to have_attributes(
:class_name => "ManageIQ::Providers::Workflows::AutomationManager::WorkflowInstance",
:method_name => "run",
:args => [{:zone => nil, :role => "automate"}],
)
end

context "with an object in queue_args" do
let(:service) { FactoryBot.create(:service) }
let(:queue_args) { {:zone => nil, :role => "automate", :object_type => service.class.name, :object_id => service.id} }

it "queues a run for the workflow instance" do
subject.send(:docker_wait)

expect(MiqQueue.count).to eq(1)
expect(MiqQueue.first).to have_attributes(
:class_name => "ManageIQ::Providers::Workflows::AutomationManager::WorkflowInstance",
:method_name => "run",
:args => [{:zone => nil, :role => "automate", :object_type => service.class.name, :object_id => service.id}],
)
end
end
end
end
end

0 comments on commit 71307dd

Please sign in to comment.