require "yaml"

module Croupier
alias TaskProc = -> String? | Array(String)

# A Task is an object that may generate output
# It has a `Proc` which is executed when the task is run
# It can have zero or more inputs
# It has zero or more outputs
# Tasks are connected by dependencies, where one task's output is another's input
class Task
include YAML::Serializable
include YAML::Serializable::Strict

property id : String = ""
property inputs : Set(String) = Set(String).new
property outputs : Array(String) = [] of String
property stale : Bool = true # ameba:disable Naming/QueryBoolMethods
property? always_run : Bool = false
property? no_save : Bool = false
@[YAML::Field(ignore: true)]
property procs : Array(TaskProc) = [] of TaskProc
property? mergeable : Bool = true

# Under what keys should this task be registered with TaskManager
def keys
@outputs.empty? ? [@id] : @outputs

# Create a task with zero or more outputs.
# `output` is an array of files or k/v store keys that the task generates
# `inputs` is an array of filesystem paths, task ids or k/v store keys that the
# task depends on.
# `proc` is a proc that is executed when the task is run
# `no_save` is a boolean that tells croupier that the task will save the files itself
# `id` is a unique identifier for the task. If the task has no outputs,
# it *must* have an id. If not given, it's calculated as a hash of outputs.
# `always_run` is a boolean that tells croupier that the task is always
# stale regardless of its dependencies' state
# `mergeable` is a boolean. If true, the task can be merged
# with others that share an output. Tasks with different
# `mergeable` values can NOT be merged together.
# k/v store keys are of the form `kv://key`, and are used to store
# intermediate data in a key/value store. They are not saved to disk.
# To access k/v data in your proc, you can use ``.
# Important: tasks will be registered in TaskManager. If the new task
# conflicts in id/outputs with others, it will be merged, and the new
# object will NOT be registered. For that reason, keeping references
# to Task objects you create is probably pointless.

def initialize(
outputs : Array(String) = [] of String,
inputs : Array(String) = [] of String,
no_save : Bool = false,
id : String | Nil = nil,
always_run : Bool = false,
mergeable : Bool = true,
&block : TaskProc
initialize(outputs, inputs, block, no_save, id, always_run, mergeable)

def initialize(
outputs : Array(String) = [] of String,
inputs : Array(String) = [] of String,
proc : TaskProc | Nil = nil,
no_save : Bool = false,
id : String | Nil = nil,
always_run : Bool = false,
mergeable : Bool = true
if !(inputs.to_set & outputs.to_set).empty?
raise "Cycle detected"
@always_run = always_run
@procs << proc unless proc.nil?
@outputs = outputs.uniq
raise "Task has no outputs and no id" if id.nil? && @outputs.empty?
@id = id ? id : Digest::SHA1.hexdigest(@outputs.join(","))[..6]
@inputs = inputs
@no_save = no_save
@mergeable = mergeable

# Register with the task manager.
# We should merge every task we have output/id collision with
# into one, and register it on every output/id of every one
# of those tasks
to_merge = ( { |k|
TaskManager.tasks.fetch(k, nil)
to_merge << self
# Refuse to merge if this task or any of the colliding ones
# are not mergeable
raise "Can't merge task #{self} with #{to_merge[..-2].map(&.to_s)}" \
if to_merge.size > 1 && to_merge.any? { |t| !t.mergeable? }
reduced = to_merge.reduce { |t1, t2| t1.merge t2 }
reduced.keys.each { |k| TaskManager.tasks[k] = reduced }

def initialize(
output : String | Nil = nil,
inputs : Array(String) = [] of String,
no_save : Bool = false,
id : String | Nil = nil,
always_run : Bool = false,
mergeable : Bool = true,
&block : TaskProc
initialize(output, inputs, block, no_save, id, always_run, mergeable)

# Create a task with zero or one outputs. Overload for convenience.
def initialize(
output : String | Nil = nil,
inputs : Array(String) = [] of String,
proc : TaskProc | Nil = nil,
no_save : Bool = false,
id : String | Nil = nil,
always_run : Bool = false,
mergeable : Bool = true
outputs: output ? [output] : [] of String,
inputs: inputs,
proc: proc,
no_save: no_save,
id: id,
always_run: always_run,
mergeable: mergeable

# Executes the proc for the task
def run
call_results = Array(String | Nil).new
@procs.each do |proc|
result =
rescue ex
raise "Task #{self} failed: #{ex}"
if result.nil?
call_results << nil
elsif result.is_a?(String)
call_results << result
call_results +=

if @no_save
# The task saved the data so we should not do it
# but we need to update hashes
@outputs.reject(&.empty?).each do |output|
# If the output is a kv:// url, we don't need to check if it exists
next if output.lchop?("kv://")
if !File.exists?(output)
raise "Task #{self} did not generate #{output}"
TaskManager.next_run[output] = Digest::SHA1.hexdigest(
# We have to save the files ourselves
begin do |output, call_result|
raise "Task #{self} did not return any data for output #{output}" if call_result.nil?
if k = output.lchop?("kv://")
# If the output is a kv:// url, we save it in the k/v store
TaskManager.set(k, call_result)
Dir.mkdir_p(File.dirname output), "w") do |io|
io << call_result
TaskManager.next_run[output] = Digest::SHA1.hexdigest(call_result)
rescue IndexError
raise "Task #{self} did not return the correct number of outputs"
@stale = false # Done, not stale anymore

# Tasks are stale if:
# * One of their inputs are stale
# * If one of the output files doesn't exist
# * If any of the inputs are generated by a stale task

# ameba:disable Metrics/CyclomaticComplexity
def stale?
# Tasks without inputs or flagged always_run are always stale
Log.trace { "#{outputs} is stale because @always_run" } if @always_run
Log.trace { "#{outputs} is stale because @inputs.empty?" } if @inputs.empty?
return true if @always_run || @inputs.empty?
# Tasks don't get stale twice
return false unless @stale

file_outputs = @outputs.reject(&.lchop?("kv://"))
kv_outputs ="kv://")).map(&.lchop("kv://"))

result = (missing_file_outputs = file_outputs.any? { |output| !File.exists?(output) }) ||
(missing_kv_outputs = kv_outputs.any? { |output| !TaskManager.get(output) }) ||
(modified_inputs = inputs.any? { |input| TaskManager.modified.includes? input }) ||
(stale_inputs = @inputs.any? { |input| TaskManager.tasks.has_key?(input) && TaskManager.tasks[input].stale? })

if result
Log.trace {
"#{outputs} is stale because of missing_file_outputs"
} if missing_file_outputs
Log.trace {
"#{outputs} is stale because of missing_kv_outputs"
} if missing_kv_outputs
Log.trace {
"#{outputs} is stale because of modified_inputs #{ { |input| TaskManager.modified.includes? input }}"
} if modified_inputs
Log.trace {
"#{outputs} is stale because of stale_inputs"
} if stale_inputs
# p! missing_file_outputs, missing_kv_outputs, modified_inputs, stale_inputs

# For inputs that are tasks, we check if they are stale
# For inputs that are not tasks, they should exist as files
# If any inputs don't fit those criteria, they are being
# waited for.
def waiting_for
@inputs.reject do |input|
if TaskManager.tasks.has_key? input
if input.lchop? "kv://"
File.exists? input

# A task is ready if it is stale and not waiting for anything
def ready?(run_all = false)
(stale? || always_run? || run_all) &&

def to_s(io)
io << @id << "::" << @outputs.join(", ")

# Merge two tasks.
# inputs and outputs are joined
# procs of the second task are added to the 1st
def merge(other : Task)
raise "Cannot merge tasks with different no_save settings" unless no_save? == other.no_save?
raise "Cannot merge tasks with different always_run settings" unless always_run? == other.always_run?

# @outputs is NOT unique! We can save multiple times
# the same file in multiple procs
@outputs += other.@outputs
@inputs += other.@inputs
@procs += other.@procs

