Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promise map improved #1775

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions lib/Mojo/Promise.pm
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,52 @@ sub finally { shift->_finally(1, @_) }
sub map {
my ($class, $options, $cb, @items) = (shift, ref $_[0] eq 'HASH' ? shift : {}, @_);

return $class->all(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};
my $start_next;
my $block_next = sub { };

my ($akey, $aggregation, $next_after_fullfil, $next_after_reject)
= !defined $options->{aggregation} ? (0, 'all', \$start_next, \$block_next)
: $options->{aggregation} eq 'any' ? (1, 'any', \$block_next, \$start_next)
: $options->{aggregation} eq 'all_settled' ? (2, 'all_settled', \$start_next, \$start_next)
: $options->{aggregation} eq 'race' ? (3, 'race', \$block_next, \$block_next)
: (0, 'all', \$start_next, \$block_next);

return $class->$aggregation(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};

my @start = map { $_->$cb } splice @items, 0, $options->{concurrency};
my @wait = map { $start[0]->clone } 0 .. $#items;

my $start_next = sub {
# N.B. $start_next will never be called for $aggregation eq 'race'
$start_next = sub {
return () unless my $item = shift @items;
my ($start_next, $chain) = (__SUB__, shift @wait);
$_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); @items = () }) for $item;
my $exec_next = sub {
$_->$cb->then(
sub {
$chain->resolve(@_);
if ($akey == 1) { @items = () }
else { $start_next->() }
},
sub {
$chain->reject(@_);
if ($akey == 0) { @items = () }
else { $start_next->() }
}
) for $item;
return ();
};
if (!$options->{delay}) {
$exec_next->();
}
else {
Mojo::IOLoop->timer($options->{delay} => sub { $exec_next->() });
}
return ();
};

$_->then($start_next, sub { }) for @start;
$_->then($$next_after_fullfil, $$next_after_reject) for @start;

return $class->all(@start, @wait);
return $class->$aggregation(@start, @wait);
}

sub new {
Expand Down Expand Up @@ -406,10 +437,14 @@ original fulfillment value or rejection reason.

my $new = Mojo::Promise->map(sub {...}, @items);
my $new = Mojo::Promise->map({concurrency => 3}, sub {...}, @items);
my $new = Mojo::Promise->map({aggregation => 'any', concurrency => 3}, sub {...}, @items);
my $new = Mojo::Promise->map({aggregation => 'all_settled', concurrency => 1, delay => 2.5 }, sub {...}, @items);

Apply a function that returns a L<Mojo::Promise> to each item in a list of items while optionally limiting concurrency.
Returns a L<Mojo::Promise> that collects the results in the same manner as L</all>. If any item's promise is rejected,
any remaining items which have not yet been mapped will not be.
Apply a function that returns a L<Mojo::Promise> to each item in a list of items while optionally limiting concurrency and inserting delays between processing items.
Returns a L<Mojo::Promise> that collects the results in the same manner as L</all>.
With the C<aggregation> option, the behaviour can be changed to the same manner as L</any> or L</all_settled>.
If nothing or L</all> is specified, and any item's promise is rejected, any remaining items which have not yet been mapped will not be.
If L</any> is specified and any item's promise is resolved, any remaining items which have not yet been mapped will not be.

# Perform 3 requests at a time concurrently
Mojo::Promise->map({concurrency => 3}, sub { $ua->get_p($_) }, @urls)
Expand All @@ -419,12 +454,24 @@ These options are currently available:

=over 2

=item aggregation

aggregation => 'any'

Specifies the aggregation behaviour. Supported values are L</all> (default), L</all_settled>, and L</any>.

=item concurrency

concurrency => 3

The maximum number of items that are in progress at the same time.

=item delay

delay => 2.5

Insert a delay of 2.5 seconds after each processed items. N.B. delay makes sense only in case concurrency is specified.

=back

=head2 new
Expand Down
37 changes: 37 additions & 0 deletions lib/Mojolicious/Guides/Cookbook.pod
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,43 @@ that create them for you.

app->start;

When processing a number of requests towards a non-robust or external resource, the kind of aggregation, maximum
concurrency and an optional delay can be specified using L<Mojo::Promise/"map">.

use Mojolicious::Lite -signatures;
use Mojo::Promise;
use Mojo::URL;

# Search MetaCPAN for a larger number of items, with mild concurrency and some throttling
get '/' => sub ($c) {

my $url = Mojo::URL->new('http://fastapi.metacpan.org/v1/module/_search');
my @items = (qw(perl mojolicious mojo minion));

# Render a response once all promises have been resolved
Mojo::Promise->map(
{concurrency => 2, delay => 0.5}, # average max 4 requests/s
sub {
my $item = $_;
$c->ua->get_p($url->clone->query({q => $item}))->then(sub { my @res = @_; Mojo::Promise->resolve($item, @res); });
},
@items
)->then(sub (@results) {
$c->render(
json => {
map {
my ($item, $res) = @$_;
$item => $res->result->json('/hits/hits/0/_source/release');
} @results
}
);
})->catch(sub ($err) {
$c->reply->exception($err);
})->wait;
};

app->start;

To create promises manually you just wrap your continuation-passing style APIs in functions that return promises.
Here's an example for how L<Mojo::UserAgent/"get_p"> works internally.

Expand Down
161 changes: 160 additions & 1 deletion t/mojo/promise.t
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ subtest 'Map (with concurrency limit)' => sub {
is_deeply \@errors, [], 'promise not rejected';
};

subtest 'Map (with reject)' => sub {
subtest 'Map (with early reject)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3},
Expand All @@ -525,6 +525,165 @@ subtest 'Map (with reject)' => sub {
is_deeply \@started, [1, 2, 3], 'only initial batch started';
};

subtest 'Map (with later reject)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub {
if ($n >= 5) { Mojo::Promise->reject($n) }
else { Mojo::Promise->resolve($n) }
});
},
1 .. 8
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
is_deeply \@results, [], 'promise not resolved';
is_deeply \@errors, [5], 'correct errors';
is_deeply \@started, [1, 2, 3, 4, 5, 6, 7], 'only maximum concurrent promises started';
};

subtest 'Map (any, with early success)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3, aggregation => 'any'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub { Mojo::Promise->resolve($n) });
},
1 .. 5
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
is_deeply \@results, [1], 'promise resolved';
is_deeply \@errors, [], 'correct errors';
is_deeply \@started, [1, 2, 3], 'only initial batch started';
};

subtest 'Map (any, with later success)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3, aggregation => 'any'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub {
if ($n >= 5) { Mojo::Promise->resolve($n) }
else { Mojo::Promise->reject($n) }
});
},
1 .. 7
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
is_deeply \@results, [5], 'promise resolved';
is_deeply \@errors, [], 'correct errors';
is_deeply \@started, [1, 2, 3, 4, 5, 6, 7], 'only maximum concurrent promises started';
};

subtest 'Map (any, all rejected)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{aggregation => 'any'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub { Mojo::Promise->reject($n) });
},
1 .. 3
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
is_deeply \@results, [], 'promise rejected';
is_deeply \@errors, [[1], [2], [3]], 'correct errors';
is_deeply \@started, [1, 2, 3], 'all started without concurrency';
};

subtest 'Map (concurrency, any, all rejected)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3, aggregation => 'any'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub { Mojo::Promise->reject($n) });
},
1 .. 5
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
is_deeply \@results, [], 'promise rejected';
is_deeply \@errors, [[1], [2], [3], [4], [5]], 'correct errors';
is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency';
};

subtest 'Map (concurrency, race, 2 of 3 rejected)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3, aggregation => 'race'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub {
if ($n % 2) { Mojo::Promise->reject($n) }
else { Mojo::Promise->resolve($n) }
});
},
1 .. 5
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
is_deeply \@results, [], 'promise rejected';
is_deeply \@errors, [1], 'correct errors';
is_deeply \@started, [1, 2, 3], 'only 3 of 5 started with concurrency';
};

subtest 'Map (concurrency, all settled, partially rejected)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 3, aggregation => 'all_settled'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub {
if ($n % 2) { Mojo::Promise->resolve($n) }
else { Mojo::Promise->reject($n) }
});
},
1 .. 5
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
my $result = [
{status => 'fulfilled', value => [1]},
{status => 'rejected', reason => [2]},
{status => 'fulfilled', value => [3]},
{status => 'rejected', reason => [4]},
{status => 'fulfilled', value => [5]}
];
is_deeply \@results, $result, 'promise resolved';
is_deeply \@errors, [], 'correct errors';
is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency';
};

subtest 'Map (concurrency, delay, all settled, partially rejected)' => sub {
my (@results, @errors, @started);
Mojo::Promise->map(
{concurrency => 2, delay => 0.1, aggregation => 'all_settled'},
sub {
my $n = $_;
push @started, $n;
Mojo::Promise->resolve->then(sub {
if ($n % 2) { Mojo::Promise->reject($n) }
else { Mojo::Promise->resolve($n) }
});
},
1 .. 5
)->then(sub { @results = @_ }, sub { @errors = @_ })->wait;
my $result = [
{status => 'rejected', reason => [1]},
{status => 'fulfilled', value => [2]},
{status => 'rejected', reason => [3]},
{status => 'fulfilled', value => [4]},
{status => 'rejected', reason => [5]},
];
is_deeply \@results, $result, 'promise resolved';
is_deeply \@errors, [], 'correct errors';

# is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency';
is scalar @started, 5, 'all started with concurrency';
};

subtest 'Map (custom event loop)' => sub {
my $ok;
my $loop = Mojo::IOLoop->new;
Expand Down