diff --git a/lib/Mojo/Promise.pm b/lib/Mojo/Promise.pm index 8109bf10f1..b886a0e77e 100644 --- a/lib/Mojo/Promise.pm +++ b/lib/Mojo/Promise.pm @@ -61,21 +61,52 @@ sub finally { shift->_finally(1, @_) } sub map { my ($class, $options, $cb, @items) = (shift, ref $_[0] eq 'HASH' ? shift : {}, @_); - return $class->all(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency}; + my $start_next; + my $block_next = sub { }; + + my ($akey, $aggregation, $next_after_fullfil, $next_after_reject) + = !defined $options->{aggregation} ? (0, 'all', \$start_next, \$block_next) + : $options->{aggregation} eq 'any' ? (1, 'any', \$block_next, \$start_next) + : $options->{aggregation} eq 'all_settled' ? (2, 'all_settled', \$start_next, \$start_next) + : $options->{aggregation} eq 'race' ? (3, 'race', \$block_next, \$block_next) + : (0, 'all', \$start_next, \$block_next); + + return $class->$aggregation(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency}; my @start = map { $_->$cb } splice @items, 0, $options->{concurrency}; my @wait = map { $start[0]->clone } 0 .. $#items; - my $start_next = sub { + # N.B. $start_next will never be called for $aggregation eq 'race' + $start_next = sub { return () unless my $item = shift @items; my ($start_next, $chain) = (__SUB__, shift @wait); - $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); @items = () }) for $item; + my $exec_next = sub { + $_->$cb->then( + sub { + $chain->resolve(@_); + if ($akey == 1) { @items = () } + else { $start_next->() } + }, + sub { + $chain->reject(@_); + if ($akey == 0) { @items = () } + else { $start_next->() } + } + ) for $item; + return (); + }; + if (!$options->{delay}) { + $exec_next->(); + } + else { + Mojo::IOLoop->timer($options->{delay} => sub { $exec_next->() }); + } return (); }; - $_->then($start_next, sub { }) for @start; + $_->then($$next_after_fullfil, $$next_after_reject) for @start; - return $class->all(@start, @wait); + return $class->$aggregation(@start, @wait); } sub new { @@ -406,10 +437,14 @@ original fulfillment value or rejection reason. my $new = Mojo::Promise->map(sub {...}, @items); my $new = Mojo::Promise->map({concurrency => 3}, sub {...}, @items); + my $new = Mojo::Promise->map({aggregation => 'any', concurrency => 3}, sub {...}, @items); + my $new = Mojo::Promise->map({aggregation => 'all_settled', concurrency => 1, delay => 2.5 }, sub {...}, @items); -Apply a function that returns a L to each item in a list of items while optionally limiting concurrency. -Returns a L that collects the results in the same manner as L. If any item's promise is rejected, -any remaining items which have not yet been mapped will not be. +Apply a function that returns a L to each item in a list of items while optionally limiting concurrency and inserting delays between processing items. +Returns a L that collects the results in the same manner as L. +With the C option, the behaviour can be changed to the same manner as L or L. +If nothing or L is specified, and any item's promise is rejected, any remaining items which have not yet been mapped will not be. +If L is specified and any item's promise is resolved, any remaining items which have not yet been mapped will not be. # Perform 3 requests at a time concurrently Mojo::Promise->map({concurrency => 3}, sub { $ua->get_p($_) }, @urls) @@ -419,12 +454,24 @@ These options are currently available: =over 2 +=item aggregation + + aggregation => 'any' + +Specifies the aggregation behaviour. Supported values are L (default), L, and L. + =item concurrency concurrency => 3 The maximum number of items that are in progress at the same time. +=item delay + + delay => 2.5 + +Insert a delay of 2.5 seconds after each processed items. N.B. delay makes sense only in case concurrency is specified. + =back =head2 new diff --git a/lib/Mojolicious/Guides/Cookbook.pod b/lib/Mojolicious/Guides/Cookbook.pod index 06880db74c..0989dcbbda 100644 --- a/lib/Mojolicious/Guides/Cookbook.pod +++ b/lib/Mojolicious/Guides/Cookbook.pod @@ -676,6 +676,43 @@ that create them for you. app->start; +When processing a number of requests towards a non-robust or external resource, the kind of aggregation, maximum +concurrency and an optional delay can be specified using L. + + use Mojolicious::Lite -signatures; + use Mojo::Promise; + use Mojo::URL; + + # Search MetaCPAN for a larger number of items, with mild concurrency and some throttling + get '/' => sub ($c) { + + my $url = Mojo::URL->new('http://fastapi.metacpan.org/v1/module/_search'); + my @items = (qw(perl mojolicious mojo minion)); + + # Render a response once all promises have been resolved + Mojo::Promise->map( + {concurrency => 2, delay => 0.5}, # average max 4 requests/s + sub { + my $item = $_; + $c->ua->get_p($url->clone->query({q => $item}))->then(sub { my @res = @_; Mojo::Promise->resolve($item, @res); }); + }, + @items + )->then(sub (@results) { + $c->render( + json => { + map { + my ($item, $res) = @$_; + $item => $res->result->json('/hits/hits/0/_source/release'); + } @results + } + ); + })->catch(sub ($err) { + $c->reply->exception($err); + })->wait; + }; + + app->start; + To create promises manually you just wrap your continuation-passing style APIs in functions that return promises. Here's an example for how L works internally. diff --git a/t/mojo/promise.t b/t/mojo/promise.t index 6feddda051..a3de3cd786 100644 --- a/t/mojo/promise.t +++ b/t/mojo/promise.t @@ -509,7 +509,7 @@ subtest 'Map (with concurrency limit)' => sub { is_deeply \@errors, [], 'promise not rejected'; }; -subtest 'Map (with reject)' => sub { +subtest 'Map (with early reject)' => sub { my (@results, @errors, @started); Mojo::Promise->map( {concurrency => 3}, @@ -525,6 +525,165 @@ subtest 'Map (with reject)' => sub { is_deeply \@started, [1, 2, 3], 'only initial batch started'; }; +subtest 'Map (with later reject)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n >= 5) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + }); + }, + 1 .. 8 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise not resolved'; + is_deeply \@errors, [5], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5, 6, 7], 'only maximum concurrent promises started'; +}; + +subtest 'Map (any, with early success)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'any'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { Mojo::Promise->resolve($n) }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [1], 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + is_deeply \@started, [1, 2, 3], 'only initial batch started'; +}; + +subtest 'Map (any, with later success)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'any'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n >= 5) { Mojo::Promise->resolve($n) } + else { Mojo::Promise->reject($n) } + }); + }, + 1 .. 7 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [5], 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5, 6, 7], 'only maximum concurrent promises started'; +}; + +subtest 'Map (any, all rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {aggregation => 'any'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { Mojo::Promise->reject($n) }); + }, + 1 .. 3 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise rejected'; + is_deeply \@errors, [[1], [2], [3]], 'correct errors'; + is_deeply \@started, [1, 2, 3], 'all started without concurrency'; +}; + +subtest 'Map (concurrency, any, all rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'any'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { Mojo::Promise->reject($n) }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise rejected'; + is_deeply \@errors, [[1], [2], [3], [4], [5]], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; +}; + +subtest 'Map (concurrency, race, 2 of 3 rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'race'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n % 2) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise rejected'; + is_deeply \@errors, [1], 'correct errors'; + is_deeply \@started, [1, 2, 3], 'only 3 of 5 started with concurrency'; +}; + +subtest 'Map (concurrency, all settled, partially rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'all_settled'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n % 2) { Mojo::Promise->resolve($n) } + else { Mojo::Promise->reject($n) } + }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + my $result = [ + {status => 'fulfilled', value => [1]}, + {status => 'rejected', reason => [2]}, + {status => 'fulfilled', value => [3]}, + {status => 'rejected', reason => [4]}, + {status => 'fulfilled', value => [5]} + ]; + is_deeply \@results, $result, 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; +}; + +subtest 'Map (concurrency, delay, all settled, partially rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 2, delay => 0.1, aggregation => 'all_settled'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n % 2) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + my $result = [ + {status => 'rejected', reason => [1]}, + {status => 'fulfilled', value => [2]}, + {status => 'rejected', reason => [3]}, + {status => 'fulfilled', value => [4]}, + {status => 'rejected', reason => [5]}, + ]; + is_deeply \@results, $result, 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + + # is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; + is scalar @started, 5, 'all started with concurrency'; +}; + subtest 'Map (custom event loop)' => sub { my $ok; my $loop = Mojo::IOLoop->new;