Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/vsementsov/tags/pull-jobs-2021-…
Browse files Browse the repository at this point in the history
…10-07-v2' into staging

mirror: Handle errors after READY cancel
v2: add small fix by Stefano, Hanna's series fixed

# gpg: Signature made Thu 07 Oct 2021 08:25:07 AM PDT
# gpg:                using RSA key 8B9C26CDB2FD147C880E86A1561F24C1F19F79FB
# gpg: Good signature from "Vladimir Sementsov-Ogievskiy <[email protected]>" [unknown]
# gpg: WARNING: This key is not certified with a trusted signature!
# gpg:          There is no indication that the signature belongs to the owner.
# Primary key fingerprint: 8B9C 26CD B2FD 147C 880E  86A1 561F 24C1 F19F 79FB

* remotes/vsementsov/tags/pull-jobs-2021-10-07-v2:
  iotests: Add mirror-ready-cancel-error test
  mirror: Do not clear .cancelled
  mirror: Stop active mirroring after force-cancel
  mirror: Check job_is_cancelled() earlier
  mirror: Use job_is_cancelled()
  job: Add job_cancel_requested()
  job: Do not soft-cancel after a job is done
  jobs: Give Job.force_cancel more meaning
  job: @force parameter for job_cancel_sync()
  job: Force-cancel jobs in a failed transaction
  mirror: Drop s->synced
  mirror: Keep s->synced on error
  job: Context changes in job_completed_txn_abort()
  block/aio_task: assert `max_busy_tasks` is greater than 0
  block/backup: avoid integer overflow of `max-workers`

Signed-off-by: Richard Henderson <[email protected]>
  • Loading branch information
rth7680 committed Oct 7, 2021
2 parents 3c01933 + 2451f72 commit 14f1211
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 92 deletions.
2 changes: 2 additions & 0 deletions block/aio_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks)
{
AioTaskPool *pool = g_new0(AioTaskPool, 1);

assert(max_busy_tasks > 0);

pool->main_co = qemu_coroutine_self();
pool->max_busy_tasks = max_busy_tasks;

Expand Down
7 changes: 4 additions & 3 deletions block/backup.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,12 @@ static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
}
}

static void backup_cancel(Job *job, bool force)
static bool backup_cancel(Job *job, bool force)
{
BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);

bdrv_cancel_in_flight(s->target_bs);
return true;
}

static const BlockJobDriver backup_job_driver = {
Expand Down Expand Up @@ -407,8 +408,8 @@ BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
return NULL;
}

if (perf->max_workers < 1) {
error_setg(errp, "max-workers must be greater than zero");
if (perf->max_workers < 1 || perf->max_workers > INT_MAX) {
error_setg(errp, "max-workers must be between 1 and %d", INT_MAX);
return NULL;
}

Expand Down
56 changes: 32 additions & 24 deletions block/mirror.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ typedef struct MirrorBlockJob {
bool zero_target;
MirrorCopyMode copy_mode;
BlockdevOnError on_source_error, on_target_error;
bool synced;
/* Set when the target is synced (dirty bitmap is clean, nothing
* in flight) and the job is running in active mode */
bool actively_synced;
Expand Down Expand Up @@ -121,7 +120,6 @@ typedef enum MirrorMethod {
static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
int error)
{
s->synced = false;
s->actively_synced = false;
if (read) {
return block_job_error_action(&s->common, s->on_source_error,
Expand Down Expand Up @@ -944,12 +942,10 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
if (s->bdev_length == 0) {
/* Transition to the READY state and wait for complete. */
job_transition_to_ready(&s->common.job);
s->synced = true;
s->actively_synced = true;
while (!job_is_cancelled(&s->common.job) && !s->should_complete) {
while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
job_yield(&s->common.job);
}
s->common.job.cancelled = false;
goto immediate_exit;
}

Expand Down Expand Up @@ -1010,6 +1006,11 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)

job_pause_point(&s->common.job);

if (job_is_cancelled(&s->common.job)) {
ret = 0;
goto immediate_exit;
}

cnt = bdrv_get_dirty_count(s->dirty_bitmap);
/* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
* the number of bytes currently being processed; together those are
Expand All @@ -1036,7 +1037,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
should_complete = false;
if (s->in_flight == 0 && cnt == 0) {
trace_mirror_before_flush(s);
if (!s->synced) {
if (!job_is_ready(&s->common.job)) {
if (mirror_flush(s) < 0) {
/* Go check s->ret. */
continue;
Expand All @@ -1047,14 +1048,13 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
* the target in a consistent state.
*/
job_transition_to_ready(&s->common.job);
s->synced = true;
if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) {
s->actively_synced = true;
}
}

should_complete = s->should_complete ||
job_is_cancelled(&s->common.job);
job_cancel_requested(&s->common.job);
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
}

Expand Down Expand Up @@ -1084,24 +1084,17 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
* completion.
*/
assert(QLIST_EMPTY(&bs->tracked_requests));
s->common.job.cancelled = false;
need_drain = false;
break;
}

ret = 0;

if (s->synced && !should_complete) {
if (job_is_ready(&s->common.job) && !should_complete) {
delay_ns = (s->in_flight == 0 &&
cnt == 0 ? BLOCK_JOB_SLICE_TIME : 0);
}
trace_mirror_before_sleep(s, cnt, s->synced, delay_ns);
trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
delay_ns);
job_sleep_ns(&s->common.job, delay_ns);
if (job_is_cancelled(&s->common.job) &&
(!s->synced || s->common.job.force_cancel))
{
break;
}
s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}

Expand All @@ -1111,8 +1104,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
* or it was cancelled prematurely so that we do not guarantee that
* the target is a copy of the source.
*/
assert(ret < 0 || ((s->common.job.force_cancel || !s->synced) &&
job_is_cancelled(&s->common.job)));
assert(ret < 0 || job_is_cancelled(&s->common.job));
assert(need_drain);
mirror_wait_for_all_io(s);
}
Expand All @@ -1135,7 +1127,7 @@ static void mirror_complete(Job *job, Error **errp)
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);

if (!s->synced) {
if (!job_is_ready(job)) {
error_setg(errp, "The active block job '%s' cannot be completed",
job->id);
return;
Expand Down Expand Up @@ -1190,21 +1182,34 @@ static bool mirror_drained_poll(BlockJob *job)
* from one of our own drain sections, to avoid a deadlock waiting for
* ourselves.
*/
if (!s->common.job.paused && !s->common.job.cancelled && !s->in_drain) {
if (!s->common.job.paused && !job_is_cancelled(&job->job) && !s->in_drain) {
return true;
}

return !!s->in_flight;
}

static void mirror_cancel(Job *job, bool force)
static bool mirror_cancel(Job *job, bool force)
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
BlockDriverState *target = blk_bs(s->target);

if (force || !job_is_ready(job)) {
/*
* Before the job is READY, we treat any cancellation like a
* force-cancellation.
*/
force = force || !job_is_ready(job);

if (force) {
bdrv_cancel_in_flight(target);
}
return force;
}

static bool commit_active_cancel(Job *job, bool force)
{
/* Same as above in mirror_cancel() */
return force || !job_is_ready(job);
}

static const BlockJobDriver mirror_job_driver = {
Expand Down Expand Up @@ -1234,6 +1239,7 @@ static const BlockJobDriver commit_active_job_driver = {
.abort = mirror_abort,
.pause = mirror_pause,
.complete = mirror_complete,
.cancel = commit_active_cancel,
},
.drained_poll = mirror_drained_poll,
};
Expand Down Expand Up @@ -1417,6 +1423,7 @@ static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs,
bool copy_to_target;

copy_to_target = s->job->ret >= 0 &&
!job_is_cancelled(&s->job->common.job) &&
s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;

if (copy_to_target) {
Expand Down Expand Up @@ -1465,6 +1472,7 @@ static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs,
bool copy_to_target;

copy_to_target = s->job->ret >= 0 &&
!job_is_cancelled(&s->job->common.job) &&
s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;

if (copy_to_target) {
Expand Down
4 changes: 2 additions & 2 deletions block/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static void replication_close(BlockDriverState *bs)
if (s->stage == BLOCK_REPLICATION_FAILOVER) {
commit_job = &s->commit_job->job;
assert(commit_job->aio_context == qemu_get_current_aio_context());
job_cancel_sync(commit_job);
job_cancel_sync(commit_job, false);
}

if (s->mode == REPLICATION_MODE_SECONDARY) {
Expand Down Expand Up @@ -726,7 +726,7 @@ static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
* disk, secondary disk in backup_job_completed().
*/
if (s->backup_job) {
job_cancel_sync(&s->backup_job->job);
job_cancel_sync(&s->backup_job->job, true);
}

if (!failover) {
Expand Down
4 changes: 2 additions & 2 deletions blockdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,7 @@ static void drive_backup_abort(BlkActionState *common)
aio_context = bdrv_get_aio_context(state->bs);
aio_context_acquire(aio_context);

job_cancel_sync(&state->job->job);
job_cancel_sync(&state->job->job, true);

aio_context_release(aio_context);
}
Expand Down Expand Up @@ -1948,7 +1948,7 @@ static void blockdev_backup_abort(BlkActionState *common)
aio_context = bdrv_get_aio_context(state->bs);
aio_context_acquire(aio_context);

job_cancel_sync(&state->job->job);
job_cancel_sync(&state->job->job, true);

aio_context_release(aio_context);
}
Expand Down
29 changes: 22 additions & 7 deletions include/qemu/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,17 @@ struct JobDriver {

/**
* If the callback is not NULL, it will be invoked in job_cancel_async
*
* This function must return true if the job will be cancelled
* immediately without any further I/O (mandatory if @force is
* true), and false otherwise. This lets the generic job layer
* know whether a job has been truly (force-)cancelled, or whether
* it is just in a special completion mode (like mirror after
* READY).
* (If the callback is NULL, the job is assumed to terminate
* without I/O.)
*/
void (*cancel)(Job *job, bool force);
bool (*cancel)(Job *job, bool force);


/** Called when the job is freed */
Expand Down Expand Up @@ -427,9 +436,15 @@ const char *job_type_str(const Job *job);
/** Returns true if the job should not be visible to the management layer. */
bool job_is_internal(Job *job);

/** Returns whether the job is scheduled for cancellation. */
/** Returns whether the job is being cancelled. */
bool job_is_cancelled(Job *job);

/**
* Returns whether the job is scheduled for cancellation (at an
* indefinite point).
*/
bool job_cancel_requested(Job *job);

/** Returns whether the job is in a completed state. */
bool job_is_completed(Job *job);

Expand Down Expand Up @@ -506,18 +521,18 @@ void job_user_cancel(Job *job, bool force, Error **errp);

/**
* Synchronously cancel the @job. The completion callback is called
* before the function returns. The job may actually complete
* instead of canceling itself; the circumstances under which this
* happens depend on the kind of job that is active.
* before the function returns. If @force is false, the job may
* actually complete instead of canceling itself; the circumstances
* under which this happens depend on the kind of job that is active.
*
* Returns the return value from the job if the job actually completed
* during the call, or -ECANCELED if it was canceled.
*
* Callers must hold the AioContext lock of job->aio_context.
*/
int job_cancel_sync(Job *job);
int job_cancel_sync(Job *job, bool force);

/** Synchronously cancels all jobs using job_cancel_sync(). */
/** Synchronously force-cancels all jobs using job_cancel_sync(). */
void job_cancel_sync_all(void);

/**
Expand Down
Loading

0 comments on commit 14f1211

Please sign in to comment.