diff --git a/app/models/manageiq/providers/workflows/automation_manager/workflow_instance.rb b/app/models/manageiq/providers/workflows/automation_manager/workflow_instance.rb index 5b3b832..eb3f67b 100644 --- a/app/models/manageiq/providers/workflows/automation_manager/workflow_instance.rb +++ b/app/models/manageiq/providers/workflows/automation_manager/workflow_instance.rb @@ -1,9 +1,12 @@ class ManageIQ::Providers::Workflows::AutomationManager::WorkflowInstance < ManageIQ::Providers::EmbeddedAutomationManager::ConfigurationScript - def run_queue(zone: nil, role: "automate", object: nil, deliver_on: nil, server_guid: nil) + def run_queue(zone: nil, role: "automate", object: nil, object_type: nil, object_id: nil, deliver_on: nil, server_guid: nil) args = {:zone => zone, :role => role} if object args[:object_type] = object.class.name args[:object_id] = object.id + elsif object_type && object_id + args[:object_type] = object_type + args[:object_id] = object_id end queue_opts = { diff --git a/lib/manageiq/providers/workflows/runner.rb b/lib/manageiq/providers/workflows/runner.rb index 714fd8f..b4108d6 100644 --- a/lib/manageiq/providers/workflows/runner.rb +++ b/lib/manageiq/providers/workflows/runner.rb @@ -22,7 +22,7 @@ def initialize def start $workflows_log.debug("Runner: Starting workflows runner...") - self.docker_wait_thread = Thread.new { docker_wait } + self.docker_wait_thread = Thread.new { loop { docker_wait } } $workflows_log.debug("Runner: Starting workflows runner...Complete") end @@ -47,23 +47,21 @@ def delete_workflow_instance(workflow_instance) attr_accessor :docker_wait_thread def docker_wait - loop do - docker_runner = Floe::Runner.for_resource("docker") - docker_runner.wait do |event, data| - execution_id, runner_context = data.values_at("execution_id", "runner_context") - $workflows_log.debug("Runner: Caught event [#{event}] for workflow [#{execution_id}] container [#{runner_context["container_ref"]}]") - - workflow_instance, queue_args = workflow_instances[execution_id] - next if workflow_instance.nil? - - $workflows_log.debug("Runner: Queueing update for WorkflowInstance ID: [#{workflow_instance.id}]") - - workflow_instance.run_queue(**queue_args) - end - rescue => err - $workflows_log.warn("Error: [#{err}]") - $workflows_log.log_backtrace(err) + docker_runner = Floe::Runner.for_resource("docker") + docker_runner.wait do |event, data| + execution_id, runner_context = data.values_at("execution_id", "runner_context") + $workflows_log.debug("Runner: Caught event [#{event}] for workflow [#{execution_id}] container [#{runner_context["container_ref"]}]") + + workflow_instance, queue_args = workflow_instances[execution_id] + next if workflow_instance.nil? + + $workflows_log.debug("Runner: Queueing update for WorkflowInstance ID: [#{workflow_instance.id}]") + + workflow_instance.run_queue(**queue_args) end + rescue => err + $workflows_log.warn("Error: [#{err}]") + $workflows_log.log_backtrace(err) end def stop_thread(thread) diff --git a/spec/lib/manageiq/providers/workflows/runner_spec.rb b/spec/lib/manageiq/providers/workflows/runner_spec.rb index d58ff9d..99744d0 100644 --- a/spec/lib/manageiq/providers/workflows/runner_spec.rb +++ b/spec/lib/manageiq/providers/workflows/runner_spec.rb @@ -29,4 +29,58 @@ end end end + + describe "#docker_wait (private)" do + let(:docker_runner) { double("Floe::ContainerRunner::Docker") } + let(:execution_id) { SecureRandom.uuid } + let(:container_ref) { SecureRandom.uuid } + let(:event) { "create" } + let(:data) { {"execution_id" => execution_id, "runner_context" => {"container_ref" => container_ref}} } + + before do + allow(Floe::Runner).to receive(:for_resource).with("docker").and_return(docker_runner) + allow(docker_runner).to receive(:wait).and_yield(event, data) + end + + context "with no workflows in #workflow_instances" do + it "doesn't queue an update for an unrecognized workflow" do + subject.send(:docker_wait) + + expect(MiqQueue.count).to be_zero + end + end + + context "with a workflow_instance registered with this runner" do + let(:workflow_instance) { FactoryBot.create(:workflows_automation_workflow_instance, :manager_ref => execution_id) } + let(:queue_args) { {:zone => nil, :role => "automate"} } + before { subject.add_workflow_instance(workflow_instance, queue_args) } + + it "queues a run for the workflow instance" do + subject.send(:docker_wait) + + expect(MiqQueue.count).to eq(1) + expect(MiqQueue.first).to have_attributes( + :class_name => "ManageIQ::Providers::Workflows::AutomationManager::WorkflowInstance", + :method_name => "run", + :args => [{:zone => nil, :role => "automate"}], + ) + end + + context "with an object in queue_args" do + let(:service) { FactoryBot.create(:service) } + let(:queue_args) { {:zone => nil, :role => "automate", :object_type => service.class.name, :object_id => service.id} } + + it "queues a run for the workflow instance" do + subject.send(:docker_wait) + + expect(MiqQueue.count).to eq(1) + expect(MiqQueue.first).to have_attributes( + :class_name => "ManageIQ::Providers::Workflows::AutomationManager::WorkflowInstance", + :method_name => "run", + :args => [{:zone => nil, :role => "automate", :object_type => service.class.name, :object_id => service.id}], + ) + end + end + end + end end