Skip to content

Commit

Permalink
Fix the cancellation function incorrect call (heremaps#1142)
Browse files Browse the repository at this point in the history
The CancellationContext was incorrectly reset. The user could store the
cancellation token for the operation that is already finished and call
the Cancel method. Change the prefetch methods to share the same
context for all operations.

Relates-To: OAM-870

Signed-off-by: Mykhailo Kuchma <[email protected]>
  • Loading branch information
mykhailo-kuchma authored Jan 15, 2021
1 parent 9ed81dd commit 683ff4c
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 227 deletions.
6 changes: 3 additions & 3 deletions olp-cpp-sdk-core/include/olp/core/client/TaskContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ class CORE_API TaskContext {
response.GetError().GetErrorCode() == ErrorCode::RequestTimeout)) {
user_response = std::move(response);
}

// Reset the context after the task is finished.
context_ = CancellationContext();
}

// Reset the context after the task is finished.
context_.ExecuteOrCancelled([]() { return CancellationToken(); });

if (callback) {
callback(std::move(user_response));
}
Expand Down
24 changes: 24 additions & 0 deletions olp-cpp-sdk-core/tests/client/TaskContextTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,30 @@ TEST(TaskContextTest, CancelToken) {
EXPECT_EQ(response.GetError().GetErrorCode(), ErrorCode::Cancelled);
}

TEST(TaskContextTest, LateCancel) {
ExecuteFunc func = [&](CancellationContext c) -> Response {
// Late cancel should not trigger the cancellation function.
c.ExecuteOrCancelled([]() -> CancellationToken {
return CancellationToken([]() { FAIL(); });
});
return std::string("Success");
};

Response response;

Callback callback = [&](Response r) { response = std::move(r); };

CancellationToken token;

{
TaskContext context = TaskContext::Create(func, callback);
token = context.CancelToken();
context.Execute();
}

token.Cancel();
}

TEST(TaskContextTest, OLPSUP_10456) {
// Cancel should not be triggered from the inside of Execute function.
// This happens when the execution function is keeping the last owning
Expand Down
25 changes: 14 additions & 11 deletions olp-cpp-sdk-dataservice-read/src/PrefetchPartitionsHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ class PrefetchPartitionsHelper {
using QueryFunc = QueryItemsFunc<std::string, std::vector<std::string>,
PartitionsDataHandleExtendedResponse>;

static client::CancellationToken Prefetch(
std::shared_ptr<DownloadJob> download_job,
const std::vector<std::string>& roots, QueryFunc query,
TaskSink& task_sink, size_t query_max_size, uint32_t priority) {
client::CancellationContext execution_context;

static void Prefetch(std::shared_ptr<DownloadJob> download_job,
const std::vector<std::string>& roots, QueryFunc query,
TaskSink& task_sink, size_t query_max_size,
uint32_t priority,
client::CancellationContext execution_context) {
auto query_job = std::make_shared<QueryPartitionsJob>(
std::move(query), nullptr, download_job, task_sink, execution_context,
priority);
Expand All @@ -79,15 +78,22 @@ class PrefetchPartitionsHelper {
auto query_element = std::vector<std::string>(
roots.begin() + start, roots.begin() + start + size);

tokens.emplace_back(task_sink.AddTask(
auto token = task_sink.AddTaskChecked(
[query_element,
query_job](client::CancellationContext context) {
return query_job->Query(std::move(query_element), context);
},
[query_job](PartitionsDataHandleExtendedResponse response) {
query_job->CompleteQuery(std::move(response));
},
priority));
priority);

if (!token) {
query_job->CompleteQuery(
client::ApiError(client::ErrorCode::Cancelled, "Cancelled"));
} else {
tokens.emplace_back(*token);
}

start += size;
}
Expand All @@ -98,9 +104,6 @@ class PrefetchPartitionsHelper {
download_job->OnPrefetchCompleted(
{{client::ErrorCode::Cancelled, "Cancelled"}});
});

return client::CancellationToken(
[execution_context]() mutable { execution_context.CancelOperation(); });
}
};

Expand Down
50 changes: 27 additions & 23 deletions olp-cpp-sdk-dataservice-read/src/PrefetchTilesHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ class PrefetchTilesHelper {
using QueryFunc =
QueryItemsFunc<geo::TileKey, geo::TileKey, repository::SubQuadsResponse>;

static client::CancellationToken Prefetch(
std::shared_ptr<DownloadJob> download_job,
const std::vector<geo::TileKey>& roots, QueryFunc query,
FilterItemsFunc<repository::SubQuadsResult> filter, TaskSink& task_sink,
uint32_t priority) {
client::CancellationContext execution_context;
static client::ApiError Canceled() {
return client::ApiError(client::ErrorCode::Cancelled, "Cancelled");
}

static void Prefetch(std::shared_ptr<DownloadJob> download_job,
const std::vector<geo::TileKey>& roots, QueryFunc query,
FilterItemsFunc<repository::SubQuadsResult> filter,
TaskSink& task_sink, uint32_t priority,
client::CancellationContext execution_context) {
auto query_job = std::make_shared<
QueryMetadataJob<geo::TileKey, geo::TileKey, PrefetchTilesResult,
repository::SubQuadsResponse, PrefetchStatus>>(
Expand All @@ -67,26 +69,28 @@ class PrefetchTilesHelper {
execution_context.ExecuteOrCancelled(
[&]() {
VectorOfTokens tokens;

auto transform_func = [&](geo::TileKey root) {
auto token = task_sink.AddTaskChecked(
[=](client::CancellationContext context) {
return query_job->Query(root, context);
},
[=](repository::SubQuadsResponse response) {
query_job->CompleteQuery(std::move(response));
},
priority);
if (!token) {
query_job->CompleteQuery(Canceled());
return client::CancellationToken();
}
return *token;
};

std::transform(std::begin(roots), std::end(roots),
std::back_inserter(tokens), [&](geo::TileKey root) {
return task_sink.AddTask(
[=](client::CancellationContext context) {
return query_job->Query(root, context);
},
[=](repository::SubQuadsResponse response) {
query_job->CompleteQuery(std::move(response));
},
priority);
});
std::back_inserter(tokens), std::move(transform_func));
return CreateToken(std::move(tokens));
},
[&]() {
download_job->OnPrefetchCompleted(
{{client::ErrorCode::Cancelled, "Cancelled"}});
});

return client::CancellationToken(
[execution_context]() mutable { execution_context.CancelOperation(); });
[&]() { download_job->OnPrefetchCompleted(Canceled()); });
}
};

Expand Down
14 changes: 14 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/TaskSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ TaskSink::~TaskSink() {

void TaskSink::CancelTasks() { pending_requests_->CancelAll(); }

client::CancellationToken TaskSink::AddTask(
std::function<void(client::CancellationContext)> func, uint32_t priority,
client::CancellationContext context) {
auto task = client::TaskContext::Create(
[](client::CancellationContext)
-> client::ApiResponse<bool, client::ApiError> {
return client::ApiError();
},
[=](client::ApiResponse<bool, client::ApiError>) { func(context); },
context);
AddTaskImpl(task, priority);
return task.CancelToken();
}

bool TaskSink::AddTaskImpl(client::TaskContext task, uint32_t priority) {
if (task_scheduler_) {
return ScheduleTask(std::move(task), priority);
Expand Down
4 changes: 4 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/TaskSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class TaskSink {

void CancelTasks();

client::CancellationToken AddTask(
std::function<void(client::CancellationContext)> func, uint32_t priority,
client::CancellationContext context);

template <typename Function, typename Callback, typename... Args>
client::CancellationToken AddTask(Function task, Callback callback,
uint32_t priority, Args&&... args) {
Expand Down
Loading

0 comments on commit 683ff4c

Please sign in to comment.