Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](backup) fix backup confict with ddl #42872

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s

DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { target_tablet = nullptr; })

DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.wait", {
auto tablet_id = dp->param<int64>("tablet_id", -1);
if (tablet_id != -1 && tablet_id == request.tablet_id) {
LOG(INFO) << "Debug: SnapshotManager::make_snapshot.wait table_id:" << tablet_id;
for (int i = 0; i < 1000; i++) {
TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(request.tablet_id);
if (tablet == nullptr || tablet->num_rows() == 0) {
target_tablet = nullptr;
sleep(3);
break;
}
sleep(1);
}
}
});

if (target_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. tablet={}", request.tablet_id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ public synchronized boolean isDone() {
return state == BackupJobState.FINISHED || state == BackupJobState.CANCELLED;
}

public synchronized boolean isSnapshotDone() {
return state != BackupJobState.PENDING && state != BackupJobState.SNAPSHOTING;
}

private void prepareAndSendSnapshotTask() {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MasterDaemon;
Expand Down Expand Up @@ -233,6 +234,11 @@ public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<

private synchronized boolean isExpire(long id, long currentTimeMs) {
long latency = currentTimeMs - idToRecycleTime.get(id);

if (DebugPointUtil.isEnable("FE.CatalogRecycleBin.isExpire")) {
LOG.info("DebugPoint set FE.CatalogRecycleBin.isExpire");
return true;
}
return (Config.catalog_trash_ignore_min_erase_latency || latency > minEraseLatency)
&& latency > Config.catalog_trash_expire_second * 1000L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.backup.AbstractJob;
import org.apache.doris.backup.BackupJob;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.BrokerTable;
Expand Down Expand Up @@ -508,6 +510,9 @@ public void dropDb(DropDbStmt stmt) throws DdlException {

// 2. drop tables in db
Database db = this.fullNameToDb.get(dbName);

checkBuckupRunning(db, null);

db.writeLock();
long recycleTime = 0;
try {
Expand Down Expand Up @@ -880,6 +885,22 @@ public void replayRenameDatabase(String dbName, String newDbName) {
LOG.info("replay rename database {} to {}", dbName, newDbName);
}

public void checkBuckupRunning(Database db, OlapTable olapTable) throws DdlException {
justfortaste marked this conversation as resolved.
Show resolved Hide resolved
if (DebugPointUtil.isEnable("FE.checkBuckupRunning.ignore")) {
LOG.info("FE.checkBuckupRunning.ignore");
return;
}

AbstractJob job = Env.getCurrentEnv().getBackupHandler().getJob(db.getId());
if (job != null job instanceof BackupJob) {
BackupJob backupJob = (BackupJob) job;
if (backupJob.isDone()
&& (olapTable == null || backupJob.getBackupMeta().getTable(olapTable.getId()) != null)) {
LOG.warn("Backup is running on this db {} ", db.getName());
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Backup is running on this db: " + db.getName());
}
}

// Drop table
public void dropTable(DropTableStmt stmt) throws DdlException {
Map<String, Long> costTimes = new TreeMap<String, Long>();
Expand Down Expand Up @@ -956,6 +977,10 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
}
}

if (table instanceof OlapTable) {
checkBuckupRunning(db, (OlapTable) table);
}

dropTableInternal(db, table, stmt.isView(), stmt.isForceDrop(), watch, costTimes);
} catch (UserException e) {
throw new DdlException(e.getMessage(), e.getMysqlErrorCode());
Expand Down Expand Up @@ -1989,6 +2014,7 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause
}
}

checkBuckupRunning(db, olapTable);
dropPartitionWithoutCheck(db, olapTable, partitionName, isTempPartition, isForceDrop);
}

Expand Down Expand Up @@ -3733,6 +3759,8 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again.");
}

checkBuckupRunning(db, olapTable);

//replace
Map<Long, RecyclePartitionParam> recyclePartitionParamMap = new HashMap<>();
oldPartitions = truncateTableInternal(olapTable, newPartitions,
Expand Down
204 changes: 204 additions & 0 deletions regression-test/suites/backup_restore/test_backup_with_ddl.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_backup_with_ddl") {

def tableName = "test_backup_with_ddl"
String dbName = "test_backup_with_ddl_db"

sql """
ADMIN SET FRONTEND CONFIG ("catalog_trash_expire_second" = "10");
"""

def insert_num = 5

sql "DROP DATABASE IF EXISTS ${dbName}"
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${dbName}.${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
)
"""

for (int i = 0; i < insert_num; ++i) {
sql """
INSERT INTO ${dbName}.${tableName} VALUES (1, ${i})
"""
}
sql " sync "
def res = sql "SELECT * FROM ${dbName}.${tableName}"
assertEquals(res.size(), insert_num)

GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()

result = sql_return_maparray """show tablets from ${dbName}.${tableName}"""
assertNotNull(result)
def tabletId = null
for (def res1 : result) {
tabletId = res1.TabletId
break
}

logger.info("========= Test 1: ignore checkBuckupRunning when truncate table ===")
def snapshotName1 = "snapshot_test_1"

GetDebugPoint().enableDebugPointForAllBEs("SnapshotManager::make_snapshot.wait", [tablet_id:"${tabletId}"]);

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName1}
TO `__keep_on_local__`
ON (${tableName})
PROPERTIES ("type" = "full")
"""

GetDebugPoint().enableDebugPointForAllFEs('FE.checkBuckupRunning.ignore', null)
GetDebugPoint().enableDebugPointForAllFEs('FE.CatalogRecycleBin.isExpire', null)

// wait backup snapshoting
count = 200 // 20s
for (int i = 0; i < count; ++i) {
def records = sql_return_maparray "SHOW BACKUP FROM ${dbName}"
int found = 0
for (def res2 : records) {
if (res2.State.equals("SNAPSHOTING")) {
found = 1
break
}
}
if (found == 1) {
break
}
Thread.sleep(100)
}

// truncate ok
sql """
truncate table ${dbName}.${tableName};
"""

sql "sync"
// assert truncate success
res = sql "SELECT * FROM ${dbName}.${tableName}"
assertEquals(res.size(), 0)

sql """
ADMIN CLEAN TRASH;
"""

// wait backup canceled, failed to get tablet
count = 1000 // 2000s
def found = 0
for (int i = 0; i < count; ++i) {
def records = sql_return_maparray "SHOW BACKUP FROM ${dbName}"
found = 0
for (def res2 : records) {
logger.info("row.State is ${res2.State}")
logger.info("row.TaskErrMsg is ${res2.TaskErrMsg}")

if (res2.State.equals("CANCELLED") && (res2.TaskErrMsg as String).contains("failed to get tablet")) {
found = 1
break
}
}
if (found == 1) {
break
}
Thread.sleep(2000)
}

assertEquals(found, 1)

syncer.waitSnapshotFinish(dbName)

GetDebugPoint().disableDebugPointForAllBEs("SnapshotManager::make_snapshot.wait")
GetDebugPoint().disableDebugPointForAllFEs('FE.checkBuckupRunning.ignore')
GetDebugPoint().disableDebugPointForAllFEs('FE.CatalogRecycleBin.isExpire')



logger.info("========= Test 2: checkBuckupRunning when truncate table ===")
def snapshotName2 = "snapshot_test_2"

for (int i = 0; i < insert_num; ++i) {
sql """
INSERT INTO ${dbName}.${tableName} VALUES (${i}, ${i})
"""
}
sql " sync "

result = sql_return_maparray """show tablets from ${dbName}.${tableName}"""
assertNotNull(result)
tabletId = null
for (def res1 : result) {
tabletId = res1.TabletId
break
}
logger.info("========= show tablet ${tabletId} ===")

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName2}
TO `__keep_on_local__`
ON (${tableName})
PROPERTIES ("type" = "full")
"""

// wait backup snapshoting
count = 200 // 20s
found = 0
for (int i = 0; i < count; ++i) {
def records = sql_return_maparray "SHOW BACKUP FROM ${dbName}"
found = 0
for (def res2 : records) {
if (res2.State .equals("SNAPSHOTING")) {
found = 1
break
}
}
if (found == 1) {
break
}
Thread.sleep(100)
}

assertEquals(found, 1)


// truncate fail errCode = 2, detailMessage = Backup is running on this db: test_backup_with_ddl_db
try_sql """
truncate table ${dbName}.${tableName};
"""
// assert truncate fail
res = sql "SELECT * FROM ${dbName}.${tableName}"
assertEquals(res.size(), insert_num)

syncer.waitSnapshotFinish(dbName)


sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql "DROP DATABASE ${dbName} FORCE"

}
Loading