]> git.ipfire.org Git - thirdparty/qemu.git/blobdiff - nbd.c
mac_dbdma: always clear FLUSH bit once DBDMA channel flush is complete
[thirdparty/qemu.git] / nbd.c
diff --git a/nbd.c b/nbd.c
index e7d1ceec436f3f853a36c9610b095c5b1acfdf59..06b501ba672b94ac7f9df07e1cd25eb41b13923b 100644 (file)
--- a/nbd.c
+++ b/nbd.c
@@ -17,7 +17,7 @@
  */
 
 #include "block/nbd.h"
-#include "block/block.h"
+#include "sysemu/block-backend.h"
 
 #include "block/coroutine.h"
 
 #define NBD_OPT_ABORT           (2)
 #define NBD_OPT_LIST            (3)
 
+/* NBD errors are based on errno numbers, so there is a 1:1 mapping,
+ * but only a limited set of errno values is specified in the protocol.
+ * Everything else is squashed to EINVAL.
+ */
+#define NBD_SUCCESS    0
+#define NBD_EPERM      1
+#define NBD_EIO        5
+#define NBD_ENOMEM     12
+#define NBD_EINVAL     22
+#define NBD_ENOSPC     28
+
+static int system_errno_to_nbd_errno(int err)
+{
+    switch (err) {
+    case 0:
+        return NBD_SUCCESS;
+    case EPERM:
+        return NBD_EPERM;
+    case EIO:
+        return NBD_EIO;
+    case ENOMEM:
+        return NBD_ENOMEM;
+#ifdef EDQUOT
+    case EDQUOT:
+#endif
+    case EFBIG:
+    case ENOSPC:
+        return NBD_ENOSPC;
+    case EINVAL:
+    default:
+        return NBD_EINVAL;
+    }
+}
+
+static int nbd_errno_to_system_errno(int err)
+{
+    switch (err) {
+    case NBD_SUCCESS:
+        return 0;
+    case NBD_EPERM:
+        return EPERM;
+    case NBD_EIO:
+        return EIO;
+    case NBD_ENOMEM:
+        return ENOMEM;
+    case NBD_ENOSPC:
+        return ENOSPC;
+    case NBD_EINVAL:
+    default:
+        return EINVAL;
+    }
+}
+
 /* Definitions for opaque data types */
 
 typedef struct NBDRequest NBDRequest;
@@ -100,13 +153,15 @@ struct NBDExport {
     int refcount;
     void (*close)(NBDExport *exp);
 
-    BlockDriverState *bs;
+    BlockBackend *blk;
     char *name;
     off_t dev_offset;
     off_t size;
     uint32_t nbdflags;
     QTAILQ_HEAD(, NBDClient) clients;
     QTAILQ_ENTRY(NBDExport) next;
+
+    AioContext *ctx;
 };
 
 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
@@ -123,6 +178,8 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
+    bool can_read;
+
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -130,6 +187,10 @@ struct NBDClient {
 
 /* That's all folks */
 
+static void nbd_set_handlers(NBDClient *client);
+static void nbd_unset_handlers(NBDClient *client);
+static void nbd_update_can_read(NBDClient *client);
+
 ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
 {
     size_t offset = 0;
@@ -156,7 +217,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
             err = socket_error();
 
             /* recoverable error */
-            if (err == EINTR || (offset > 0 && err == EAGAIN)) {
+            if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) {
                 continue;
             }
 
@@ -185,6 +246,26 @@ static ssize_t read_sync(int fd, void *buffer, size_t size)
     return nbd_wr_sync(fd, buffer, size, true);
 }
 
+static ssize_t drop_sync(int fd, size_t size)
+{
+    ssize_t ret, dropped = size;
+    uint8_t *buffer = g_malloc(MIN(65536, size));
+
+    while (size > 0) {
+        ret = read_sync(fd, buffer, MIN(65536, size));
+        if (ret < 0) {
+            g_free(buffer);
+            return ret;
+        }
+
+        assert(ret <= size);
+        size -= ret;
+    }
+
+    g_free(buffer);
+    return dropped;
+}
+
 static ssize_t write_sync(int fd, void *buffer, size_t size)
 {
     int ret;
@@ -295,6 +376,9 @@ static int nbd_handle_list(NBDClient *client, uint32_t length)
 
     csock = client->sock;
     if (length) {
+        if (drop_sync(csock, length) != length) {
+            return -EIO;
+        }
         return nbd_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
     }
 
@@ -342,30 +426,39 @@ fail:
 
 static int nbd_receive_options(NBDClient *client)
 {
+    int csock = client->sock;
+    uint32_t flags;
+
+    /* Client sends:
+        [ 0 ..   3]   client flags
+
+        [ 0 ..   7]   NBD_OPTS_MAGIC
+        [ 8 ..  11]   NBD option
+        [12 ..  15]   Data length
+        ...           Rest of request
+
+        [ 0 ..   7]   NBD_OPTS_MAGIC
+        [ 8 ..  11]   Second NBD option
+        [12 ..  15]   Data length
+        ...           Rest of request
+    */
+
+    if (read_sync(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+        LOG("read failed");
+        return -EIO;
+    }
+    TRACE("Checking client flags");
+    be32_to_cpus(&flags);
+    if (flags != 0 && flags != NBD_FLAG_C_FIXED_NEWSTYLE) {
+        LOG("Bad client flags received");
+        return -EIO;
+    }
+
     while (1) {
-        int csock = client->sock;
+        int ret;
         uint32_t tmp, length;
         uint64_t magic;
 
-        /* Client sends:
-            [ 0 ..   3]   client flags
-            [ 4 ..  11]   NBD_OPTS_MAGIC
-            [12 ..  15]   NBD option
-            [16 ..  19]   length
-            ...           Rest of request
-        */
-
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
-            LOG("read failed");
-            return -EINVAL;
-        }
-        TRACE("Checking client flags");
-        tmp = be32_to_cpu(tmp);
-        if (tmp != 0 && tmp != NBD_FLAG_C_FIXED_NEWSTYLE) {
-            LOG("Bad client flags received");
-            return -EINVAL;
-        }
-
         if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
             LOG("read failed");
             return -EINVAL;
@@ -390,8 +483,9 @@ static int nbd_receive_options(NBDClient *client)
         TRACE("Checking option");
         switch (be32_to_cpu(tmp)) {
         case NBD_OPT_LIST:
-            if (nbd_handle_list(client, length) < 0) {
-                return 1;
+            ret = nbd_handle_list(client, length);
+            if (ret < 0) {
+                return ret;
             }
             break;
 
@@ -486,7 +580,7 @@ fail:
 }
 
 int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
-                          off_t *size, size_t *blocksize)
+                          off_t *size, Error **errp)
 {
     char buf[256];
     uint64_t magic, s;
@@ -498,13 +592,13 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
     rc = -EINVAL;
 
     if (read_sync(csock, buf, 8) != 8) {
-        LOG("read failed");
+        error_setg(errp, "Failed to read data");
         goto fail;
     }
 
     buf[8] = '\0';
     if (strlen(buf) == 0) {
-        LOG("server connection closed");
+        error_setg(errp, "Server connection closed unexpectedly");
         goto fail;
     }
 
@@ -519,12 +613,12 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
           qemu_isprint(buf[7]) ? buf[7] : '.');
 
     if (memcmp(buf, "NBDMAGIC", 8) != 0) {
-        LOG("Invalid magic received");
+        error_setg(errp, "Invalid magic received");
         goto fail;
     }
 
     if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
-        LOG("read failed");
+        error_setg(errp, "Failed to read magic");
         goto fail;
     }
     magic = be64_to_cpu(magic);
@@ -537,73 +631,80 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
 
         TRACE("Checking magic (opts_magic)");
         if (magic != NBD_OPTS_MAGIC) {
-            LOG("Bad magic received");
+            if (magic == NBD_CLIENT_MAGIC) {
+                error_setg(errp, "Server does not support export names");
+            } else {
+                error_setg(errp, "Bad magic received");
+            }
             goto fail;
         }
         if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
-            LOG("flags read failed");
+            error_setg(errp, "Failed to read server flags");
             goto fail;
         }
         *flags = be16_to_cpu(tmp) << 16;
         /* reserved for future use */
         if (write_sync(csock, &reserved, sizeof(reserved)) !=
             sizeof(reserved)) {
-            LOG("write failed (reserved)");
+            error_setg(errp, "Failed to read reserved field");
             goto fail;
         }
         /* write the export name */
         magic = cpu_to_be64(magic);
         if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
-            LOG("write failed (magic)");
+            error_setg(errp, "Failed to send export name magic");
             goto fail;
         }
         opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
         if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
-            LOG("write failed (opt)");
+            error_setg(errp, "Failed to send export name option number");
             goto fail;
         }
         namesize = cpu_to_be32(strlen(name));
         if (write_sync(csock, &namesize, sizeof(namesize)) !=
             sizeof(namesize)) {
-            LOG("write failed (namesize)");
+            error_setg(errp, "Failed to send export name length");
             goto fail;
         }
         if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
-            LOG("write failed (name)");
+            error_setg(errp, "Failed to send export name");
             goto fail;
         }
     } else {
         TRACE("Checking magic (cli_magic)");
 
         if (magic != NBD_CLIENT_MAGIC) {
-            LOG("Bad magic received");
+            if (magic == NBD_OPTS_MAGIC) {
+                error_setg(errp, "Server requires an export name");
+            } else {
+                error_setg(errp, "Bad magic received");
+            }
             goto fail;
         }
     }
 
     if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
-        LOG("read failed");
+        error_setg(errp, "Failed to read export length");
         goto fail;
     }
     *size = be64_to_cpu(s);
-    *blocksize = 1024;
     TRACE("Size is %" PRIu64, *size);
 
     if (!name) {
         if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
-            LOG("read failed (flags)");
+            error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags = be32_to_cpup(flags);
     } else {
         if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
-            LOG("read failed (tmp)");
+            error_setg(errp, "Failed to read export flags");
             goto fail;
         }
-        *flags |= be32_to_cpu(tmp);
+        *flags |= be16_to_cpu(tmp);
     }
     if (read_sync(csock, &buf, 124) != 124) {
-        LOG("read failed (buf)");
+        error_setg(errp, "Failed to read reserved block");
         goto fail;
     }
     rc = 0;
@@ -613,7 +714,7 @@ fail:
 }
 
 #ifdef __linux__
-int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize)
+int nbd_init(int fd, int csock, uint32_t flags, off_t size)
 {
     TRACE("Setting NBD socket");
 
@@ -623,17 +724,17 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize)
         return -serrno;
     }
 
-    TRACE("Setting block size to %lu", (unsigned long)blocksize);
+    TRACE("Setting block size to %lu", (unsigned long)BDRV_SECTOR_SIZE);
 
-    if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) < 0) {
+    if (ioctl(fd, NBD_SET_BLKSIZE, (size_t)BDRV_SECTOR_SIZE) < 0) {
         int serrno = errno;
         LOG("Failed setting NBD block size");
         return -serrno;
     }
 
-        TRACE("Setting size to %zd block(s)", (size_t)(size / blocksize));
+    TRACE("Setting size to %zd block(s)", (size_t)(size / BDRV_SECTOR_SIZE));
 
-    if (ioctl(fd, NBD_SET_SIZE_BLOCKS, size / blocksize) < 0) {
+    if (ioctl(fd, NBD_SET_SIZE_BLOCKS, (size_t)(size / BDRV_SECTOR_SIZE)) < 0) {
         int serrno = errno;
         LOG("Failed setting size (in blocks)");
         return -serrno;
@@ -698,7 +799,7 @@ int nbd_client(int fd)
     return ret;
 }
 #else
-int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize)
+int nbd_init(int fd, int csock, uint32_t flags, off_t size)
 {
     return -ENOTSUP;
 }
@@ -808,6 +909,8 @@ ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
     reply->error  = be32_to_cpup((uint32_t*)(buf + 4));
     reply->handle = be64_to_cpup((uint64_t*)(buf + 8));
 
+    reply->error = nbd_errno_to_system_errno(reply->error);
+
     TRACE("Got reply: "
           "{ magic = 0x%x, .error = %d, handle = %" PRIu64" }",
           magic, reply->error, reply->handle);
@@ -824,6 +927,8 @@ static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
     uint8_t buf[NBD_REPLY_SIZE];
     ssize_t ret;
 
+    reply->error = system_errno_to_nbd_errno(reply->error);
+
     /* Reply
        [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
        [ 4 ..  7]    error   (0 == no error)
@@ -858,11 +963,11 @@ void nbd_client_put(NBDClient *client)
 {
     if (--client->refcount == 0) {
         /* The last reference should be dropped by client->close,
-         * which is called by nbd_client_close.
+         * which is called by client_close.
          */
         assert(client->closing);
 
-        qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL);
+        nbd_unset_handlers(client);
         close(client->sock);
         client->sock = -1;
         if (client->exp) {
@@ -873,7 +978,7 @@ void nbd_client_put(NBDClient *client)
     }
 }
 
-void nbd_client_close(NBDClient *client)
+static void client_close(NBDClient *client)
 {
     if (client->closing) {
         return;
@@ -898,6 +1003,7 @@ static NBDRequest *nbd_request_get(NBDClient *client)
 
     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
     client->nb_requests++;
+    nbd_update_can_read(client);
 
     req = g_slice_new0(NBDRequest);
     nbd_client_get(client);
@@ -914,26 +1020,72 @@ static void nbd_request_put(NBDRequest *req)
     }
     g_slice_free(NBDRequest, req);
 
-    if (client->nb_requests-- == MAX_NBD_REQUESTS) {
-        qemu_notify_event();
-    }
+    client->nb_requests--;
+    nbd_update_can_read(client);
     nbd_client_put(client);
 }
 
-NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset,
-                          off_t size, uint32_t nbdflags,
-                          void (*close)(NBDExport *))
+static void blk_aio_attached(AioContext *ctx, void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx);
+
+    exp->ctx = ctx;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        nbd_set_handlers(client);
+    }
+}
+
+static void blk_aio_detach(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        nbd_unset_handlers(client);
+    }
+
+    exp->ctx = NULL;
+}
+
+NBDExport *nbd_export_new(BlockBackend *blk, off_t dev_offset, off_t size,
+                          uint32_t nbdflags, void (*close)(NBDExport *),
+                          Error **errp)
 {
     NBDExport *exp = g_malloc0(sizeof(NBDExport));
     exp->refcount = 1;
     QTAILQ_INIT(&exp->clients);
-    exp->bs = bs;
+    exp->blk = blk;
     exp->dev_offset = dev_offset;
     exp->nbdflags = nbdflags;
-    exp->size = size == -1 ? bdrv_getlength(bs) : size;
+    exp->size = size < 0 ? blk_getlength(blk) : size;
+    if (exp->size < 0) {
+        error_setg_errno(errp, -exp->size,
+                         "Failed to determine the NBD export's length");
+        goto fail;
+    }
+    exp->size -= exp->size % BDRV_SECTOR_SIZE;
+
     exp->close = close;
-    bdrv_ref(bs);
+    exp->ctx = blk_get_aio_context(blk);
+    blk_ref(blk);
+    blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
+    /*
+     * NBD exports are used for non-shared storage migration.  Make sure
+     * that BDRV_O_INCOMING is cleared and the image is ready for write
+     * access since the export could be available before migration handover.
+     */
+    blk_invalidate_cache(blk, NULL);
     return exp;
+
+fail:
+    g_free(exp);
+    return NULL;
 }
 
 NBDExport *nbd_export_find(const char *name)
@@ -975,13 +1127,15 @@ void nbd_export_close(NBDExport *exp)
 
     nbd_export_get(exp);
     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
-        nbd_client_close(client);
+        client_close(client);
     }
     nbd_export_set_name(exp, NULL);
     nbd_export_put(exp);
-    if (exp->bs) {
-        bdrv_unref(exp->bs);
-        exp->bs = NULL;
+    if (exp->blk) {
+        blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
+                                        blk_aio_detach, exp);
+        blk_unref(exp->blk);
+        exp->blk = NULL;
     }
 }
 
@@ -1009,9 +1163,9 @@ void nbd_export_put(NBDExport *exp)
     }
 }
 
-BlockDriverState *nbd_export_get_blockdev(NBDExport *exp)
+BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
 {
-    return exp->bs;
+    return exp->blk;
 }
 
 void nbd_export_close_all(void)
@@ -1023,10 +1177,6 @@ void nbd_export_close_all(void)
     }
 }
 
-static int nbd_can_read(void *opaque);
-static void nbd_read(void *opaque);
-static void nbd_restart_write(void *opaque);
-
 static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
                                  int len)
 {
@@ -1035,9 +1185,8 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
     ssize_t rc, ret;
 
     qemu_co_mutex_lock(&client->send_lock);
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
-                         nbd_restart_write, client);
     client->send_coroutine = qemu_coroutine_self();
+    nbd_set_handlers(client);
 
     if (!len) {
         rc = nbd_send_reply(csock, reply);
@@ -1054,7 +1203,7 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
     }
 
     client->send_coroutine = NULL;
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
+    nbd_set_handlers(client);
     qemu_co_mutex_unlock(&client->send_lock);
     return rc;
 }
@@ -1067,6 +1216,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
     ssize_t rc;
 
     client->recv_coroutine = qemu_coroutine_self();
+    nbd_update_can_read(client);
+
     rc = nbd_receive_request(csock, request);
     if (rc < 0) {
         if (rc != -EAGAIN) {
@@ -1093,7 +1244,7 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
 
     command = request->type & NBD_CMD_MASK_COMMAND;
     if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) {
-        req->data = qemu_blockalign(client->exp->bs, request->len);
+        req->data = blk_blockalign(client->exp->blk, request->len);
     }
     if (command == NBD_CMD_WRITE) {
         TRACE("Reading %u byte(s)", request->len);
@@ -1108,6 +1259,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
 
 out:
     client->recv_coroutine = NULL;
+    nbd_update_can_read(client);
+
     return rc;
 }
 
@@ -1157,7 +1310,7 @@ static void nbd_trip(void *opaque)
         TRACE("Request type is READ");
 
         if (request.type & NBD_CMD_FLAG_FUA) {
-            ret = bdrv_co_flush(exp->bs);
+            ret = blk_co_flush(exp->blk);
             if (ret < 0) {
                 LOG("flush failed");
                 reply.error = -ret;
@@ -1165,8 +1318,9 @@ static void nbd_trip(void *opaque)
             }
         }
 
-        ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512,
-                        req->data, request.len / 512);
+        ret = blk_read(exp->blk,
+                       (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
+                       req->data, request.len / BDRV_SECTOR_SIZE);
         if (ret < 0) {
             LOG("reading from file failed");
             reply.error = -ret;
@@ -1188,8 +1342,9 @@ static void nbd_trip(void *opaque)
 
         TRACE("Writing to device");
 
-        ret = bdrv_write(exp->bs, (request.from + exp->dev_offset) / 512,
-                         req->data, request.len / 512);
+        ret = blk_write(exp->blk,
+                        (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
+                        req->data, request.len / BDRV_SECTOR_SIZE);
         if (ret < 0) {
             LOG("writing to file failed");
             reply.error = -ret;
@@ -1197,7 +1352,7 @@ static void nbd_trip(void *opaque)
         }
 
         if (request.type & NBD_CMD_FLAG_FUA) {
-            ret = bdrv_co_flush(exp->bs);
+            ret = blk_co_flush(exp->blk);
             if (ret < 0) {
                 LOG("flush failed");
                 reply.error = -ret;
@@ -1216,7 +1371,7 @@ static void nbd_trip(void *opaque)
     case NBD_CMD_FLUSH:
         TRACE("Request type is FLUSH");
 
-        ret = bdrv_co_flush(exp->bs);
+        ret = blk_co_flush(exp->blk);
         if (ret < 0) {
             LOG("flush failed");
             reply.error = -ret;
@@ -1227,8 +1382,9 @@ static void nbd_trip(void *opaque)
         break;
     case NBD_CMD_TRIM:
         TRACE("Request type is TRIM");
-        ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512,
-                              request.len / 512);
+        ret = blk_co_discard(exp->blk, (request.from + exp->dev_offset)
+                                       / BDRV_SECTOR_SIZE,
+                             request.len / BDRV_SECTOR_SIZE);
         if (ret < 0) {
             LOG("discard failed");
             reply.error = -ret;
@@ -1240,7 +1396,7 @@ static void nbd_trip(void *opaque)
     default:
         LOG("invalid request type (%u) received", request.type);
     invalid_request:
-        reply.error = -EINVAL;
+        reply.error = EINVAL;
     error_reply:
         if (nbd_co_send_reply(req, &reply, 0) < 0) {
             goto out;
@@ -1256,14 +1412,7 @@ done:
 
 out:
     nbd_request_put(req);
-    nbd_client_close(client);
-}
-
-static int nbd_can_read(void *opaque)
-{
-    NBDClient *client = opaque;
-
-    return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS;
+    client_close(client);
 }
 
 static void nbd_read(void *opaque)
@@ -1284,6 +1433,37 @@ static void nbd_restart_write(void *opaque)
     qemu_coroutine_enter(client->send_coroutine, NULL);
 }
 
+static void nbd_set_handlers(NBDClient *client)
+{
+    if (client->exp && client->exp->ctx) {
+        aio_set_fd_handler(client->exp->ctx, client->sock,
+                           client->can_read ? nbd_read : NULL,
+                           client->send_coroutine ? nbd_restart_write : NULL,
+                           client);
+    }
+}
+
+static void nbd_unset_handlers(NBDClient *client)
+{
+    if (client->exp && client->exp->ctx) {
+        aio_set_fd_handler(client->exp->ctx, client->sock, NULL, NULL, NULL);
+    }
+}
+
+static void nbd_update_can_read(NBDClient *client)
+{
+    bool can_read = client->recv_coroutine ||
+                    client->nb_requests < MAX_NBD_REQUESTS;
+
+    if (can_read != client->can_read) {
+        client->can_read = can_read;
+        nbd_set_handlers(client);
+
+        /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
+         * in nbd_set_handlers() will have taken care of that */
+    }
+}
+
 NBDClient *nbd_client_new(NBDExport *exp, int csock,
                           void (*close)(NBDClient *))
 {
@@ -1292,13 +1472,14 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
     client->refcount = 1;
     client->exp = exp;
     client->sock = csock;
+    client->can_read = true;
     if (nbd_send_negotiate(client)) {
         g_free(client);
         return NULL;
     }
     client->close = close;
     qemu_co_mutex_init(&client->send_lock);
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
+    nbd_set_handlers(client);
 
     if (exp) {
         QTAILQ_INSERT_TAIL(&exp->clients, client, next);