]> git.ipfire.org Git - thirdparty/qemu.git/blob - blockjob.c
job: Move state transitions to Job
[thirdparty/qemu.git] / blockjob.c
1 /*
2 * QEMU System Emulator block driver
3 *
4 * Copyright (c) 2011 IBM Corp.
5 * Copyright (c) 2012 Red Hat, Inc.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 */
25
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "block/trace.h"
32 #include "sysemu/block-backend.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-events-block-core.h"
35 #include "qapi/qmp/qerror.h"
36 #include "qemu/coroutine.h"
37 #include "qemu/timer.h"
38
39 /* Right now, this mutex is only needed to synchronize accesses to job->busy
40 * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
41 * block_job_enter. */
42 static QemuMutex block_job_mutex;
43
44 static void block_job_lock(void)
45 {
46 qemu_mutex_lock(&block_job_mutex);
47 }
48
49 static void block_job_unlock(void)
50 {
51 qemu_mutex_unlock(&block_job_mutex);
52 }
53
54 static void __attribute__((__constructor__)) block_job_init(void)
55 {
56 qemu_mutex_init(&block_job_mutex);
57 }
58
59 static void block_job_event_cancelled(BlockJob *job);
60 static void block_job_event_completed(BlockJob *job, const char *msg);
61 static int block_job_event_pending(BlockJob *job);
62 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
63
64 /* Transactional group of block jobs */
65 struct BlockJobTxn {
66
67 /* Is this txn being cancelled? */
68 bool aborting;
69
70 /* List of jobs */
71 QLIST_HEAD(, BlockJob) jobs;
72
73 /* Reference count */
74 int refcnt;
75 };
76
77 /*
78 * The block job API is composed of two categories of functions.
79 *
80 * The first includes functions used by the monitor. The monitor is
81 * peculiar in that it accesses the block job list with block_job_get, and
82 * therefore needs consistency across block_job_get and the actual operation
83 * (e.g. block_job_set_speed). The consistency is achieved with
84 * aio_context_acquire/release. These functions are declared in blockjob.h.
85 *
86 * The second includes functions used by the block job drivers and sometimes
87 * by the core block layer. These do not care about locking, because the
88 * whole coroutine runs under the AioContext lock, and are declared in
89 * blockjob_int.h.
90 */
91
92 static bool is_block_job(Job *job)
93 {
94 return job_type(job) == JOB_TYPE_BACKUP ||
95 job_type(job) == JOB_TYPE_COMMIT ||
96 job_type(job) == JOB_TYPE_MIRROR ||
97 job_type(job) == JOB_TYPE_STREAM;
98 }
99
100 BlockJob *block_job_next(BlockJob *bjob)
101 {
102 Job *job = bjob ? &bjob->job : NULL;
103
104 do {
105 job = job_next(job);
106 } while (job && !is_block_job(job));
107
108 return job ? container_of(job, BlockJob, job) : NULL;
109 }
110
111 BlockJob *block_job_get(const char *id)
112 {
113 Job *job = job_get(id);
114
115 if (job && is_block_job(job)) {
116 return container_of(job, BlockJob, job);
117 } else {
118 return NULL;
119 }
120 }
121
122 BlockJobTxn *block_job_txn_new(void)
123 {
124 BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
125 QLIST_INIT(&txn->jobs);
126 txn->refcnt = 1;
127 return txn;
128 }
129
130 static void block_job_txn_ref(BlockJobTxn *txn)
131 {
132 txn->refcnt++;
133 }
134
135 void block_job_txn_unref(BlockJobTxn *txn)
136 {
137 if (txn && --txn->refcnt == 0) {
138 g_free(txn);
139 }
140 }
141
142 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
143 {
144 if (!txn) {
145 return;
146 }
147
148 assert(!job->txn);
149 job->txn = txn;
150
151 QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
152 block_job_txn_ref(txn);
153 }
154
155 static void block_job_txn_del_job(BlockJob *job)
156 {
157 if (job->txn) {
158 QLIST_REMOVE(job, txn_list);
159 block_job_txn_unref(job->txn);
160 job->txn = NULL;
161 }
162 }
163
164 /* Assumes the block_job_mutex is held */
165 static bool block_job_timer_pending(BlockJob *job)
166 {
167 return timer_pending(&job->sleep_timer);
168 }
169
170 /* Assumes the block_job_mutex is held */
171 static bool block_job_timer_not_pending(BlockJob *job)
172 {
173 return !block_job_timer_pending(job);
174 }
175
176 static void block_job_pause(BlockJob *job)
177 {
178 job->pause_count++;
179 }
180
181 static void block_job_resume(BlockJob *job)
182 {
183 assert(job->pause_count > 0);
184 job->pause_count--;
185 if (job->pause_count) {
186 return;
187 }
188
189 /* kick only if no timer is pending */
190 block_job_enter_cond(job, block_job_timer_not_pending);
191 }
192
193 void block_job_ref(BlockJob *job)
194 {
195 ++job->refcnt;
196 }
197
198 static void block_job_attached_aio_context(AioContext *new_context,
199 void *opaque);
200 static void block_job_detach_aio_context(void *opaque);
201
202 void block_job_unref(BlockJob *job)
203 {
204 if (--job->refcnt == 0) {
205 assert(job->job.status == JOB_STATUS_NULL);
206 assert(!job->txn);
207 BlockDriverState *bs = blk_bs(job->blk);
208 bs->job = NULL;
209 block_job_remove_all_bdrv(job);
210 blk_remove_aio_context_notifier(job->blk,
211 block_job_attached_aio_context,
212 block_job_detach_aio_context, job);
213 blk_unref(job->blk);
214 error_free(job->blocker);
215 assert(!timer_pending(&job->sleep_timer));
216 job_delete(&job->job);
217 }
218 }
219
220 static void block_job_attached_aio_context(AioContext *new_context,
221 void *opaque)
222 {
223 BlockJob *job = opaque;
224
225 if (job->driver->attached_aio_context) {
226 job->driver->attached_aio_context(job, new_context);
227 }
228
229 block_job_resume(job);
230 }
231
232 static void block_job_drain(BlockJob *job)
233 {
234 /* If job is !job->busy this kicks it into the next pause point. */
235 block_job_enter(job);
236
237 blk_drain(job->blk);
238 if (job->driver->drain) {
239 job->driver->drain(job);
240 }
241 }
242
243 static void block_job_detach_aio_context(void *opaque)
244 {
245 BlockJob *job = opaque;
246
247 /* In case the job terminates during aio_poll()... */
248 block_job_ref(job);
249
250 block_job_pause(job);
251
252 while (!job->paused && !job->completed) {
253 block_job_drain(job);
254 }
255
256 block_job_unref(job);
257 }
258
259 static char *child_job_get_parent_desc(BdrvChild *c)
260 {
261 BlockJob *job = c->opaque;
262 return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
263 }
264
265 static void child_job_drained_begin(BdrvChild *c)
266 {
267 BlockJob *job = c->opaque;
268 block_job_pause(job);
269 }
270
271 static void child_job_drained_end(BdrvChild *c)
272 {
273 BlockJob *job = c->opaque;
274 block_job_resume(job);
275 }
276
277 static const BdrvChildRole child_job = {
278 .get_parent_desc = child_job_get_parent_desc,
279 .drained_begin = child_job_drained_begin,
280 .drained_end = child_job_drained_end,
281 .stay_at_node = true,
282 };
283
284 void block_job_remove_all_bdrv(BlockJob *job)
285 {
286 GSList *l;
287 for (l = job->nodes; l; l = l->next) {
288 BdrvChild *c = l->data;
289 bdrv_op_unblock_all(c->bs, job->blocker);
290 bdrv_root_unref_child(c);
291 }
292 g_slist_free(job->nodes);
293 job->nodes = NULL;
294 }
295
296 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
297 uint64_t perm, uint64_t shared_perm, Error **errp)
298 {
299 BdrvChild *c;
300
301 c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
302 job, errp);
303 if (c == NULL) {
304 return -EPERM;
305 }
306
307 job->nodes = g_slist_prepend(job->nodes, c);
308 bdrv_ref(bs);
309 bdrv_op_block_all(bs, job->blocker);
310
311 return 0;
312 }
313
314 bool block_job_is_internal(BlockJob *job)
315 {
316 return (job->job.id == NULL);
317 }
318
319 static bool block_job_started(BlockJob *job)
320 {
321 return job->co;
322 }
323
324 const BlockJobDriver *block_job_driver(BlockJob *job)
325 {
326 return job->driver;
327 }
328
329 /**
330 * All jobs must allow a pause point before entering their job proper. This
331 * ensures that jobs can be paused prior to being started, then resumed later.
332 */
333 static void coroutine_fn block_job_co_entry(void *opaque)
334 {
335 BlockJob *job = opaque;
336
337 assert(job && job->driver && job->driver->start);
338 block_job_pause_point(job);
339 job->driver->start(job);
340 }
341
342 static void block_job_sleep_timer_cb(void *opaque)
343 {
344 BlockJob *job = opaque;
345
346 block_job_enter(job);
347 }
348
349 void block_job_start(BlockJob *job)
350 {
351 assert(job && !block_job_started(job) && job->paused &&
352 job->driver && job->driver->start);
353 job->co = qemu_coroutine_create(block_job_co_entry, job);
354 job->pause_count--;
355 job->busy = true;
356 job->paused = false;
357 job_state_transition(&job->job, JOB_STATUS_RUNNING);
358 bdrv_coroutine_enter(blk_bs(job->blk), job->co);
359 }
360
361 static void block_job_decommission(BlockJob *job)
362 {
363 assert(job);
364 job->completed = true;
365 job->busy = false;
366 job->paused = false;
367 job->deferred_to_main_loop = true;
368 block_job_txn_del_job(job);
369 job_state_transition(&job->job, JOB_STATUS_NULL);
370 block_job_unref(job);
371 }
372
373 static void block_job_do_dismiss(BlockJob *job)
374 {
375 block_job_decommission(job);
376 }
377
378 static void block_job_conclude(BlockJob *job)
379 {
380 job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
381 if (job->auto_dismiss || !block_job_started(job)) {
382 block_job_do_dismiss(job);
383 }
384 }
385
386 static void block_job_update_rc(BlockJob *job)
387 {
388 if (!job->ret && block_job_is_cancelled(job)) {
389 job->ret = -ECANCELED;
390 }
391 if (job->ret) {
392 job_state_transition(&job->job, JOB_STATUS_ABORTING);
393 }
394 }
395
396 static int block_job_prepare(BlockJob *job)
397 {
398 if (job->ret == 0 && job->driver->prepare) {
399 job->ret = job->driver->prepare(job);
400 }
401 return job->ret;
402 }
403
404 static void block_job_commit(BlockJob *job)
405 {
406 assert(!job->ret);
407 if (job->driver->commit) {
408 job->driver->commit(job);
409 }
410 }
411
412 static void block_job_abort(BlockJob *job)
413 {
414 assert(job->ret);
415 if (job->driver->abort) {
416 job->driver->abort(job);
417 }
418 }
419
420 static void block_job_clean(BlockJob *job)
421 {
422 if (job->driver->clean) {
423 job->driver->clean(job);
424 }
425 }
426
427 static int block_job_finalize_single(BlockJob *job)
428 {
429 assert(job->completed);
430
431 /* Ensure abort is called for late-transactional failures */
432 block_job_update_rc(job);
433
434 if (!job->ret) {
435 block_job_commit(job);
436 } else {
437 block_job_abort(job);
438 }
439 block_job_clean(job);
440
441 if (job->cb) {
442 job->cb(job->opaque, job->ret);
443 }
444
445 /* Emit events only if we actually started */
446 if (block_job_started(job)) {
447 if (block_job_is_cancelled(job)) {
448 block_job_event_cancelled(job);
449 } else {
450 const char *msg = NULL;
451 if (job->ret < 0) {
452 msg = strerror(-job->ret);
453 }
454 block_job_event_completed(job, msg);
455 }
456 }
457
458 block_job_txn_del_job(job);
459 block_job_conclude(job);
460 return 0;
461 }
462
463 static void block_job_cancel_async(BlockJob *job, bool force)
464 {
465 if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
466 block_job_iostatus_reset(job);
467 }
468 if (job->user_paused) {
469 /* Do not call block_job_enter here, the caller will handle it. */
470 job->user_paused = false;
471 job->pause_count--;
472 }
473 job->cancelled = true;
474 /* To prevent 'force == false' overriding a previous 'force == true' */
475 job->force |= force;
476 }
477
478 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
479 {
480 AioContext *ctx;
481 BlockJob *job, *next;
482 int rc = 0;
483
484 QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
485 if (lock) {
486 ctx = blk_get_aio_context(job->blk);
487 aio_context_acquire(ctx);
488 }
489 rc = fn(job);
490 if (lock) {
491 aio_context_release(ctx);
492 }
493 if (rc) {
494 break;
495 }
496 }
497 return rc;
498 }
499
500 static int block_job_finish_sync(BlockJob *job,
501 void (*finish)(BlockJob *, Error **errp),
502 Error **errp)
503 {
504 Error *local_err = NULL;
505 int ret;
506
507 assert(blk_bs(job->blk)->job == job);
508
509 block_job_ref(job);
510
511 if (finish) {
512 finish(job, &local_err);
513 }
514 if (local_err) {
515 error_propagate(errp, local_err);
516 block_job_unref(job);
517 return -EBUSY;
518 }
519 /* block_job_drain calls block_job_enter, and it should be enough to
520 * induce progress until the job completes or moves to the main thread.
521 */
522 while (!job->deferred_to_main_loop && !job->completed) {
523 block_job_drain(job);
524 }
525 while (!job->completed) {
526 aio_poll(qemu_get_aio_context(), true);
527 }
528 ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
529 block_job_unref(job);
530 return ret;
531 }
532
533 static void block_job_completed_txn_abort(BlockJob *job)
534 {
535 AioContext *ctx;
536 BlockJobTxn *txn = job->txn;
537 BlockJob *other_job;
538
539 if (txn->aborting) {
540 /*
541 * We are cancelled by another job, which will handle everything.
542 */
543 return;
544 }
545 txn->aborting = true;
546 block_job_txn_ref(txn);
547
548 /* We are the first failed job. Cancel other jobs. */
549 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
550 ctx = blk_get_aio_context(other_job->blk);
551 aio_context_acquire(ctx);
552 }
553
554 /* Other jobs are effectively cancelled by us, set the status for
555 * them; this job, however, may or may not be cancelled, depending
556 * on the caller, so leave it. */
557 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
558 if (other_job != job) {
559 block_job_cancel_async(other_job, false);
560 }
561 }
562 while (!QLIST_EMPTY(&txn->jobs)) {
563 other_job = QLIST_FIRST(&txn->jobs);
564 ctx = blk_get_aio_context(other_job->blk);
565 if (!other_job->completed) {
566 assert(other_job->cancelled);
567 block_job_finish_sync(other_job, NULL, NULL);
568 }
569 block_job_finalize_single(other_job);
570 aio_context_release(ctx);
571 }
572
573 block_job_txn_unref(txn);
574 }
575
576 static int block_job_needs_finalize(BlockJob *job)
577 {
578 return !job->auto_finalize;
579 }
580
581 static void block_job_do_finalize(BlockJob *job)
582 {
583 int rc;
584 assert(job && job->txn);
585
586 /* prepare the transaction to complete */
587 rc = block_job_txn_apply(job->txn, block_job_prepare, true);
588 if (rc) {
589 block_job_completed_txn_abort(job);
590 } else {
591 block_job_txn_apply(job->txn, block_job_finalize_single, true);
592 }
593 }
594
595 static void block_job_completed_txn_success(BlockJob *job)
596 {
597 BlockJobTxn *txn = job->txn;
598 BlockJob *other_job;
599
600 job_state_transition(&job->job, JOB_STATUS_WAITING);
601
602 /*
603 * Successful completion, see if there are other running jobs in this
604 * txn.
605 */
606 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
607 if (!other_job->completed) {
608 return;
609 }
610 assert(other_job->ret == 0);
611 }
612
613 block_job_txn_apply(txn, block_job_event_pending, false);
614
615 /* If no jobs need manual finalization, automatically do so */
616 if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
617 block_job_do_finalize(job);
618 }
619 }
620
621 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
622 {
623 int64_t old_speed = job->speed;
624
625 if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp)) {
626 return;
627 }
628 if (speed < 0) {
629 error_setg(errp, QERR_INVALID_PARAMETER, "speed");
630 return;
631 }
632
633 ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
634
635 job->speed = speed;
636 if (speed && speed <= old_speed) {
637 return;
638 }
639
640 /* kick only if a timer is pending */
641 block_job_enter_cond(job, block_job_timer_pending);
642 }
643
644 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
645 {
646 if (!job->speed) {
647 return 0;
648 }
649
650 return ratelimit_calculate_delay(&job->limit, n);
651 }
652
653 void block_job_complete(BlockJob *job, Error **errp)
654 {
655 /* Should not be reachable via external interface for internal jobs */
656 assert(job->job.id);
657 if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
658 return;
659 }
660 if (job->pause_count || job->cancelled || !job->driver->complete) {
661 error_setg(errp, "The active block job '%s' cannot be completed",
662 job->job.id);
663 return;
664 }
665
666 job->driver->complete(job, errp);
667 }
668
669 void block_job_finalize(BlockJob *job, Error **errp)
670 {
671 assert(job && job->job.id);
672 if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
673 return;
674 }
675 block_job_do_finalize(job);
676 }
677
678 void block_job_dismiss(BlockJob **jobptr, Error **errp)
679 {
680 BlockJob *job = *jobptr;
681 /* similarly to _complete, this is QMP-interface only. */
682 assert(job->job.id);
683 if (job_apply_verb(&job->job, JOB_VERB_DISMISS, errp)) {
684 return;
685 }
686
687 block_job_do_dismiss(job);
688 *jobptr = NULL;
689 }
690
691 void block_job_user_pause(BlockJob *job, Error **errp)
692 {
693 if (job_apply_verb(&job->job, JOB_VERB_PAUSE, errp)) {
694 return;
695 }
696 if (job->user_paused) {
697 error_setg(errp, "Job is already paused");
698 return;
699 }
700 job->user_paused = true;
701 block_job_pause(job);
702 }
703
704 bool block_job_user_paused(BlockJob *job)
705 {
706 return job->user_paused;
707 }
708
709 void block_job_user_resume(BlockJob *job, Error **errp)
710 {
711 assert(job);
712 if (!job->user_paused || job->pause_count <= 0) {
713 error_setg(errp, "Can't resume a job that was not paused");
714 return;
715 }
716 if (job_apply_verb(&job->job, JOB_VERB_RESUME, errp)) {
717 return;
718 }
719 block_job_iostatus_reset(job);
720 job->user_paused = false;
721 block_job_resume(job);
722 }
723
724 void block_job_cancel(BlockJob *job, bool force)
725 {
726 if (job->job.status == JOB_STATUS_CONCLUDED) {
727 block_job_do_dismiss(job);
728 return;
729 }
730 block_job_cancel_async(job, force);
731 if (!block_job_started(job)) {
732 block_job_completed(job, -ECANCELED);
733 } else if (job->deferred_to_main_loop) {
734 block_job_completed_txn_abort(job);
735 } else {
736 block_job_enter(job);
737 }
738 }
739
740 void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
741 {
742 if (job_apply_verb(&job->job, JOB_VERB_CANCEL, errp)) {
743 return;
744 }
745 block_job_cancel(job, force);
746 }
747
748 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
749 * used with block_job_finish_sync() without the need for (rather nasty)
750 * function pointer casts there. */
751 static void block_job_cancel_err(BlockJob *job, Error **errp)
752 {
753 block_job_cancel(job, false);
754 }
755
756 int block_job_cancel_sync(BlockJob *job)
757 {
758 return block_job_finish_sync(job, &block_job_cancel_err, NULL);
759 }
760
761 void block_job_cancel_sync_all(void)
762 {
763 BlockJob *job;
764 AioContext *aio_context;
765
766 while ((job = block_job_next(NULL))) {
767 aio_context = blk_get_aio_context(job->blk);
768 aio_context_acquire(aio_context);
769 block_job_cancel_sync(job);
770 aio_context_release(aio_context);
771 }
772 }
773
774 int block_job_complete_sync(BlockJob *job, Error **errp)
775 {
776 return block_job_finish_sync(job, &block_job_complete, errp);
777 }
778
779 void block_job_progress_update(BlockJob *job, uint64_t done)
780 {
781 job->offset += done;
782 }
783
784 void block_job_progress_set_remaining(BlockJob *job, uint64_t remaining)
785 {
786 job->len = job->offset + remaining;
787 }
788
789 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
790 {
791 BlockJobInfo *info;
792
793 if (block_job_is_internal(job)) {
794 error_setg(errp, "Cannot query QEMU internal jobs");
795 return NULL;
796 }
797 info = g_new0(BlockJobInfo, 1);
798 info->type = g_strdup(job_type_str(&job->job));
799 info->device = g_strdup(job->job.id);
800 info->len = job->len;
801 info->busy = atomic_read(&job->busy);
802 info->paused = job->pause_count > 0;
803 info->offset = job->offset;
804 info->speed = job->speed;
805 info->io_status = job->iostatus;
806 info->ready = job->ready;
807 info->status = job->job.status;
808 info->auto_finalize = job->auto_finalize;
809 info->auto_dismiss = job->auto_dismiss;
810 info->has_error = job->ret != 0;
811 info->error = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
812 return info;
813 }
814
815 static void block_job_iostatus_set_err(BlockJob *job, int error)
816 {
817 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
818 job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
819 BLOCK_DEVICE_IO_STATUS_FAILED;
820 }
821 }
822
823 static void block_job_event_cancelled(BlockJob *job)
824 {
825 if (block_job_is_internal(job)) {
826 return;
827 }
828
829 qapi_event_send_block_job_cancelled(job_type(&job->job),
830 job->job.id,
831 job->len,
832 job->offset,
833 job->speed,
834 &error_abort);
835 }
836
837 static void block_job_event_completed(BlockJob *job, const char *msg)
838 {
839 if (block_job_is_internal(job)) {
840 return;
841 }
842
843 qapi_event_send_block_job_completed(job_type(&job->job),
844 job->job.id,
845 job->len,
846 job->offset,
847 job->speed,
848 !!msg,
849 msg,
850 &error_abort);
851 }
852
853 static int block_job_event_pending(BlockJob *job)
854 {
855 job_state_transition(&job->job, JOB_STATUS_PENDING);
856 if (!job->auto_finalize && !block_job_is_internal(job)) {
857 qapi_event_send_block_job_pending(job_type(&job->job),
858 job->job.id,
859 &error_abort);
860 }
861 return 0;
862 }
863
864 /*
865 * API for block job drivers and the block layer. These functions are
866 * declared in blockjob_int.h.
867 */
868
869 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
870 BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
871 uint64_t shared_perm, int64_t speed, int flags,
872 BlockCompletionFunc *cb, void *opaque, Error **errp)
873 {
874 BlockBackend *blk;
875 BlockJob *job;
876 int ret;
877
878 if (bs->job) {
879 error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
880 return NULL;
881 }
882
883 if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
884 job_id = bdrv_get_device_name(bs);
885 if (!*job_id) {
886 error_setg(errp, "An explicit job ID is required for this node");
887 return NULL;
888 }
889 }
890
891 if (job_id) {
892 if (flags & BLOCK_JOB_INTERNAL) {
893 error_setg(errp, "Cannot specify job ID for internal block job");
894 return NULL;
895 }
896 }
897
898 blk = blk_new(perm, shared_perm);
899 ret = blk_insert_bs(blk, bs, errp);
900 if (ret < 0) {
901 blk_unref(blk);
902 return NULL;
903 }
904
905 job = job_create(job_id, &driver->job_driver, errp);
906 if (job == NULL) {
907 blk_unref(blk);
908 return NULL;
909 }
910
911 assert(is_block_job(&job->job));
912
913 job->driver = driver;
914 job->blk = blk;
915 job->cb = cb;
916 job->opaque = opaque;
917 job->busy = false;
918 job->paused = true;
919 job->pause_count = 1;
920 job->refcnt = 1;
921 job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
922 job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS);
923 aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
924 QEMU_CLOCK_REALTIME, SCALE_NS,
925 block_job_sleep_timer_cb, job);
926
927 error_setg(&job->blocker, "block device is in use by block job: %s",
928 job_type_str(&job->job));
929 block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
930 bs->job = job;
931
932 bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
933
934 blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
935 block_job_detach_aio_context, job);
936
937 /* Only set speed when necessary to avoid NotSupported error */
938 if (speed != 0) {
939 Error *local_err = NULL;
940
941 block_job_set_speed(job, speed, &local_err);
942 if (local_err) {
943 block_job_early_fail(job);
944 error_propagate(errp, local_err);
945 return NULL;
946 }
947 }
948
949 /* Single jobs are modeled as single-job transactions for sake of
950 * consolidating the job management logic */
951 if (!txn) {
952 txn = block_job_txn_new();
953 block_job_txn_add_job(txn, job);
954 block_job_txn_unref(txn);
955 } else {
956 block_job_txn_add_job(txn, job);
957 }
958
959 return job;
960 }
961
962 void block_job_early_fail(BlockJob *job)
963 {
964 assert(job->job.status == JOB_STATUS_CREATED);
965 block_job_decommission(job);
966 }
967
968 void block_job_completed(BlockJob *job, int ret)
969 {
970 assert(job && job->txn && !job->completed);
971 assert(blk_bs(job->blk)->job == job);
972 job->completed = true;
973 job->ret = ret;
974 block_job_update_rc(job);
975 trace_block_job_completed(job, ret, job->ret);
976 if (job->ret) {
977 block_job_completed_txn_abort(job);
978 } else {
979 block_job_completed_txn_success(job);
980 }
981 }
982
983 static bool block_job_should_pause(BlockJob *job)
984 {
985 return job->pause_count > 0;
986 }
987
988 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
989 * Reentering the job coroutine with block_job_enter() before the timer has
990 * expired is allowed and cancels the timer.
991 *
992 * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
993 * called explicitly. */
994 static void block_job_do_yield(BlockJob *job, uint64_t ns)
995 {
996 block_job_lock();
997 if (ns != -1) {
998 timer_mod(&job->sleep_timer, ns);
999 }
1000 job->busy = false;
1001 block_job_unlock();
1002 qemu_coroutine_yield();
1003
1004 /* Set by block_job_enter before re-entering the coroutine. */
1005 assert(job->busy);
1006 }
1007
1008 void coroutine_fn block_job_pause_point(BlockJob *job)
1009 {
1010 assert(job && block_job_started(job));
1011
1012 if (!block_job_should_pause(job)) {
1013 return;
1014 }
1015 if (block_job_is_cancelled(job)) {
1016 return;
1017 }
1018
1019 if (job->driver->pause) {
1020 job->driver->pause(job);
1021 }
1022
1023 if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
1024 JobStatus status = job->job.status;
1025 job_state_transition(&job->job, status == JOB_STATUS_READY
1026 ? JOB_STATUS_STANDBY
1027 : JOB_STATUS_PAUSED);
1028 job->paused = true;
1029 block_job_do_yield(job, -1);
1030 job->paused = false;
1031 job_state_transition(&job->job, status);
1032 }
1033
1034 if (job->driver->resume) {
1035 job->driver->resume(job);
1036 }
1037 }
1038
1039 /*
1040 * Conditionally enter a block_job pending a call to fn() while
1041 * under the block_job_lock critical section.
1042 */
1043 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1044 {
1045 if (!block_job_started(job)) {
1046 return;
1047 }
1048 if (job->deferred_to_main_loop) {
1049 return;
1050 }
1051
1052 block_job_lock();
1053 if (job->busy) {
1054 block_job_unlock();
1055 return;
1056 }
1057
1058 if (fn && !fn(job)) {
1059 block_job_unlock();
1060 return;
1061 }
1062
1063 assert(!job->deferred_to_main_loop);
1064 timer_del(&job->sleep_timer);
1065 job->busy = true;
1066 block_job_unlock();
1067 aio_co_wake(job->co);
1068 }
1069
1070 void block_job_enter(BlockJob *job)
1071 {
1072 block_job_enter_cond(job, NULL);
1073 }
1074
1075 bool block_job_is_cancelled(BlockJob *job)
1076 {
1077 return job->cancelled;
1078 }
1079
1080 void block_job_sleep_ns(BlockJob *job, int64_t ns)
1081 {
1082 assert(job->busy);
1083
1084 /* Check cancellation *before* setting busy = false, too! */
1085 if (block_job_is_cancelled(job)) {
1086 return;
1087 }
1088
1089 if (!block_job_should_pause(job)) {
1090 block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1091 }
1092
1093 block_job_pause_point(job);
1094 }
1095
1096 void block_job_yield(BlockJob *job)
1097 {
1098 assert(job->busy);
1099
1100 /* Check cancellation *before* setting busy = false, too! */
1101 if (block_job_is_cancelled(job)) {
1102 return;
1103 }
1104
1105 if (!block_job_should_pause(job)) {
1106 block_job_do_yield(job, -1);
1107 }
1108
1109 block_job_pause_point(job);
1110 }
1111
1112 void block_job_iostatus_reset(BlockJob *job)
1113 {
1114 if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1115 return;
1116 }
1117 assert(job->user_paused && job->pause_count > 0);
1118 job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1119 }
1120
1121 void block_job_event_ready(BlockJob *job)
1122 {
1123 job_state_transition(&job->job, JOB_STATUS_READY);
1124 job->ready = true;
1125
1126 if (block_job_is_internal(job)) {
1127 return;
1128 }
1129
1130 qapi_event_send_block_job_ready(job_type(&job->job),
1131 job->job.id,
1132 job->len,
1133 job->offset,
1134 job->speed, &error_abort);
1135 }
1136
1137 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1138 int is_read, int error)
1139 {
1140 BlockErrorAction action;
1141
1142 switch (on_err) {
1143 case BLOCKDEV_ON_ERROR_ENOSPC:
1144 case BLOCKDEV_ON_ERROR_AUTO:
1145 action = (error == ENOSPC) ?
1146 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1147 break;
1148 case BLOCKDEV_ON_ERROR_STOP:
1149 action = BLOCK_ERROR_ACTION_STOP;
1150 break;
1151 case BLOCKDEV_ON_ERROR_REPORT:
1152 action = BLOCK_ERROR_ACTION_REPORT;
1153 break;
1154 case BLOCKDEV_ON_ERROR_IGNORE:
1155 action = BLOCK_ERROR_ACTION_IGNORE;
1156 break;
1157 default:
1158 abort();
1159 }
1160 if (!block_job_is_internal(job)) {
1161 qapi_event_send_block_job_error(job->job.id,
1162 is_read ? IO_OPERATION_TYPE_READ :
1163 IO_OPERATION_TYPE_WRITE,
1164 action, &error_abort);
1165 }
1166 if (action == BLOCK_ERROR_ACTION_STOP) {
1167 block_job_pause(job);
1168 /* make the pause user visible, which will be resumed from QMP. */
1169 job->user_paused = true;
1170 block_job_iostatus_set_err(job, error);
1171 }
1172 return action;
1173 }
1174
1175 typedef struct {
1176 BlockJob *job;
1177 AioContext *aio_context;
1178 BlockJobDeferToMainLoopFn *fn;
1179 void *opaque;
1180 } BlockJobDeferToMainLoopData;
1181
1182 static void block_job_defer_to_main_loop_bh(void *opaque)
1183 {
1184 BlockJobDeferToMainLoopData *data = opaque;
1185 AioContext *aio_context;
1186
1187 /* Prevent race with block_job_defer_to_main_loop() */
1188 aio_context_acquire(data->aio_context);
1189
1190 /* Fetch BDS AioContext again, in case it has changed */
1191 aio_context = blk_get_aio_context(data->job->blk);
1192 if (aio_context != data->aio_context) {
1193 aio_context_acquire(aio_context);
1194 }
1195
1196 data->fn(data->job, data->opaque);
1197
1198 if (aio_context != data->aio_context) {
1199 aio_context_release(aio_context);
1200 }
1201
1202 aio_context_release(data->aio_context);
1203
1204 g_free(data);
1205 }
1206
1207 void block_job_defer_to_main_loop(BlockJob *job,
1208 BlockJobDeferToMainLoopFn *fn,
1209 void *opaque)
1210 {
1211 BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
1212 data->job = job;
1213 data->aio_context = blk_get_aio_context(job->blk);
1214 data->fn = fn;
1215 data->opaque = opaque;
1216 job->deferred_to_main_loop = true;
1217
1218 aio_bh_schedule_oneshot(qemu_get_aio_context(),
1219 block_job_defer_to_main_loop_bh, data);
1220 }