Skip to content

Commit

Permalink
fix random terminate without exception error when deconstruct MPPTask (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 23, 2021
1 parent 1df6b91 commit 7388e5a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 3 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_set_segment_ingest_packs_fail) \
M(segment_merge_after_ingest_packs) \
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists)
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/FailPoint.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/materializeBlock.h>
Expand All @@ -11,6 +12,10 @@
namespace DB
{

namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -99,8 +104,8 @@ void CreatingSetsBlockInputStream::createAll()
{
if (isCancelledOrThrowIfKilled())
return;

workers.push_back(std::thread(&CreatingSetsBlockInputStream::createOne, this, std::ref(elem.second), current_memory_tracker));
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
}
}
Expand All @@ -122,7 +127,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr
{

current_memory_tracker = memory_tracker;
LOG_TRACE(log,
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< std::to_string(mpp_task_id));
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits, Int64 mpp_task_id_);
~CreatingSetsBlockInputStream()
{
for (auto & worker : workers)
{
if (worker.joinable())
worker.join();
}
}

String getName() const override { return "CreatingSets"; }

Expand Down
43 changes: 43 additions & 0 deletions tests/fullstack-test/mpp/issue_2471.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Preparation.
=> DBGInvoke __init_fail_point()

mysql> drop table if exists test.a
mysql> create table test.a (pk int not null, id int, value varchar(64))
mysql> insert into test.a values(0,1,'a'),(1,2,'b')

mysql> alter table test.a set tiflash replica 1

func> wait_table test a


mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id;
+----+------+-------+------+------+-------+
| pk | id | value | pk | id | value |
+----+------+-------+------+------+-------+
| 0 | 1 | a | 0 | 1 | a |
| 1 | 2 | b | 1 | 2 | b |
+----+------+-------+------+------+-------+

=> DBGInvoke __enable_fail_point(exception_in_creating_set_input_stream)

mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered.

=> DBGInvoke __disable_fail_point(exception_in_creating_set_input_stream)

mysql> use test; select sleep(5);
+----------+
| sleep(5) |
+----------+
| 0 |
+----------+
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id;
+----+------+-------+------+------+-------+
| pk | id | value | pk | id | value |
+----+------+-------+------+------+-------+
| 0 | 1 | a | 0 | 1 | a |
| 1 | 2 | b | 1 | 2 | b |
+----+------+-------+------+------+-------+

# Clean up.
# mysql> drop table if exists test.a

0 comments on commit 7388e5a

Please sign in to comment.