]> git.ipfire.org Git - thirdparty/kernel/stable.git/blobdiff - drivers/block/nbd.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
[thirdparty/kernel/stable.git] / drivers / block / nbd.c
index d8a23561b4cb4b720065ccf1bd97714a07395c73..9b482baa869ebd480027f1166b07ae0491d7551f 100644 (file)
 #include <asm/types.h>
 
 #include <linux/nbd.h>
+#include <linux/nbd-netlink.h>
+#include <net/genetlink.h>
 
 static DEFINE_IDR(nbd_index_idr);
 static DEFINE_MUTEX(nbd_index_mutex);
+static int nbd_total_devices = 0;
 
 struct nbd_sock {
        struct socket *sock;
        struct mutex tx_lock;
        struct request *pending;
        int sent;
+       bool dead;
+       int fallback_index;
+       int cookie;
+};
+
+struct recv_thread_args {
+       struct work_struct work;
+       struct nbd_device *nbd;
+       int index;
+};
+
+struct link_dead_args {
+       struct work_struct work;
+       int index;
 };
 
 #define NBD_TIMEDOUT                   0
 #define NBD_DISCONNECT_REQUESTED       1
 #define NBD_DISCONNECTED               2
-#define NBD_RUNNING                    3
+#define NBD_HAS_PID_FILE               3
+#define NBD_HAS_CONFIG_REF             4
+#define NBD_BOUND                      5
+#define NBD_DESTROY_ON_DISCONNECT      6
 
-struct nbd_device {
+struct nbd_config {
        u32 flags;
        unsigned long runtime_flags;
-       struct nbd_sock **socks;
-       int magic;
+       u64 dead_conn_timeout;
 
-       struct blk_mq_tag_set tag_set;
-
-       struct mutex config_lock;
-       struct gendisk *disk;
+       struct nbd_sock **socks;
        int num_connections;
+       atomic_t live_connections;
+       wait_queue_head_t conn_wait;
+
        atomic_t recv_threads;
        wait_queue_head_t recv_wq;
        loff_t blksize;
        loff_t bytesize;
-
-       struct task_struct *task_recv;
-       struct task_struct *task_setup;
-
 #if IS_ENABLED(CONFIG_DEBUG_FS)
        struct dentry *dbg_dir;
 #endif
 };
 
+struct nbd_device {
+       struct blk_mq_tag_set tag_set;
+
+       int index;
+       refcount_t config_refs;
+       refcount_t refs;
+       struct nbd_config *config;
+       struct mutex config_lock;
+       struct gendisk *disk;
+
+       struct list_head list;
+       struct task_struct *task_recv;
+       struct task_struct *task_setup;
+};
+
 struct nbd_cmd {
        struct nbd_device *nbd;
+       int index;
+       int cookie;
        struct completion send_complete;
+       int status;
 };
 
 #if IS_ENABLED(CONFIG_DEBUG_FS)
@@ -100,18 +133,16 @@ static int part_shift;
 
 static int nbd_dev_dbg_init(struct nbd_device *nbd);
 static void nbd_dev_dbg_close(struct nbd_device *nbd);
-
+static void nbd_config_put(struct nbd_device *nbd);
+static void nbd_connect_reply(struct genl_info *info, int index);
+static int nbd_genl_status(struct sk_buff *skb, struct genl_info *info);
+static void nbd_dead_link_work(struct work_struct *work);
 
 static inline struct device *nbd_to_dev(struct nbd_device *nbd)
 {
        return disk_to_dev(nbd->disk);
 }
 
-static bool nbd_is_connected(struct nbd_device *nbd)
-{
-       return !!nbd->task_recv;
-}
-
 static const char *nbdcmd_to_ascii(int cmd)
 {
        switch (cmd) {
@@ -124,44 +155,104 @@ static const char *nbdcmd_to_ascii(int cmd)
        return "invalid";
 }
 
-static int nbd_size_clear(struct nbd_device *nbd, struct block_device *bdev)
+static ssize_t pid_show(struct device *dev,
+                       struct device_attribute *attr, char *buf)
 {
-       if (bdev->bd_openers <= 1)
-               bd_set_size(bdev, 0);
-       set_capacity(nbd->disk, 0);
-       kobject_uevent(&nbd_to_dev(nbd)->kobj, KOBJ_CHANGE);
+       struct gendisk *disk = dev_to_disk(dev);
+       struct nbd_device *nbd = (struct nbd_device *)disk->private_data;
 
-       return 0;
+       return sprintf(buf, "%d\n", task_pid_nr(nbd->task_recv));
+}
+
+static struct device_attribute pid_attr = {
+       .attr = { .name = "pid", .mode = S_IRUGO},
+       .show = pid_show,
+};
+
+static void nbd_dev_remove(struct nbd_device *nbd)
+{
+       struct gendisk *disk = nbd->disk;
+       if (disk) {
+               del_gendisk(disk);
+               blk_cleanup_queue(disk->queue);
+               blk_mq_free_tag_set(&nbd->tag_set);
+               disk->private_data = NULL;
+               put_disk(disk);
+       }
+       kfree(nbd);
+}
+
+static void nbd_put(struct nbd_device *nbd)
+{
+       if (refcount_dec_and_mutex_lock(&nbd->refs,
+                                       &nbd_index_mutex)) {
+               idr_remove(&nbd_index_idr, nbd->index);
+               mutex_unlock(&nbd_index_mutex);
+               nbd_dev_remove(nbd);
+       }
+}
+
+static int nbd_disconnected(struct nbd_config *config)
+{
+       return test_bit(NBD_DISCONNECTED, &config->runtime_flags) ||
+               test_bit(NBD_DISCONNECT_REQUESTED, &config->runtime_flags);
+}
+
+static void nbd_mark_nsock_dead(struct nbd_device *nbd, struct nbd_sock *nsock,
+                               int notify)
+{
+       if (!nsock->dead && notify && !nbd_disconnected(nbd->config)) {
+               struct link_dead_args *args;
+               args = kmalloc(sizeof(struct link_dead_args), GFP_NOIO);
+               if (args) {
+                       INIT_WORK(&args->work, nbd_dead_link_work);
+                       args->index = nbd->index;
+                       queue_work(system_wq, &args->work);
+               }
+       }
+       if (!nsock->dead) {
+               kernel_sock_shutdown(nsock->sock, SHUT_RDWR);
+               atomic_dec(&nbd->config->live_connections);
+       }
+       nsock->dead = true;
+       nsock->pending = NULL;
+       nsock->sent = 0;
+}
+
+static void nbd_size_clear(struct nbd_device *nbd)
+{
+       if (nbd->config->bytesize) {
+               set_capacity(nbd->disk, 0);
+               kobject_uevent(&nbd_to_dev(nbd)->kobj, KOBJ_CHANGE);
+       }
 }
 
-static void nbd_size_update(struct nbd_device *nbd, struct block_device *bdev)
+static void nbd_size_update(struct nbd_device *nbd)
 {
-       blk_queue_logical_block_size(nbd->disk->queue, nbd->blksize);
-       blk_queue_physical_block_size(nbd->disk->queue, nbd->blksize);
-       bd_set_size(bdev, nbd->bytesize);
-       set_capacity(nbd->disk, nbd->bytesize >> 9);
+       struct nbd_config *config = nbd->config;
+       blk_queue_logical_block_size(nbd->disk->queue, config->blksize);
+       blk_queue_physical_block_size(nbd->disk->queue, config->blksize);
+       set_capacity(nbd->disk, config->bytesize >> 9);
        kobject_uevent(&nbd_to_dev(nbd)->kobj, KOBJ_CHANGE);
 }
 
-static void nbd_size_set(struct nbd_device *nbd, struct block_device *bdev,
-                       loff_t blocksize, loff_t nr_blocks)
+static void nbd_size_set(struct nbd_device *nbd, loff_t blocksize,
+                        loff_t nr_blocks)
 {
-       nbd->blksize = blocksize;
-       nbd->bytesize = blocksize * nr_blocks;
-       if (nbd_is_connected(nbd))
-               nbd_size_update(nbd, bdev);
+       struct nbd_config *config = nbd->config;
+       config->blksize = blocksize;
+       config->bytesize = blocksize * nr_blocks;
+       nbd_size_update(nbd);
 }
 
-static void nbd_end_request(struct nbd_cmd *cmd)
+static void nbd_complete_rq(struct request *req)
 {
-       struct nbd_device *nbd = cmd->nbd;
-       struct request *req = blk_mq_rq_from_pdu(cmd);
-       int error = req->errors ? -EIO : 0;
+       struct nbd_cmd *cmd = blk_mq_rq_to_pdu(req);
 
-       dev_dbg(nbd_to_dev(nbd), "request %p: %s\n", cmd,
-               error ? "failed" : "done");
+       dev_dbg(nbd_to_dev(cmd->nbd), "request %p: %s\n", cmd,
+               cmd->status ? "failed" : "done");
 
-       blk_mq_complete_request(req, error);
+       blk_mq_end_request(req, cmd->status);
 }
 
 /*
@@ -169,17 +260,18 @@ static void nbd_end_request(struct nbd_cmd *cmd)
  */
 static void sock_shutdown(struct nbd_device *nbd)
 {
+       struct nbd_config *config = nbd->config;
        int i;
 
-       if (nbd->num_connections == 0)
+       if (config->num_connections == 0)
                return;
-       if (test_and_set_bit(NBD_DISCONNECTED, &nbd->runtime_flags))
+       if (test_and_set_bit(NBD_DISCONNECTED, &config->runtime_flags))
                return;
 
-       for (i = 0; i < nbd->num_connections; i++) {
-               struct nbd_sock *nsock = nbd->socks[i];
+       for (i = 0; i < config->num_connections; i++) {
+               struct nbd_sock *nsock = config->socks[i];
                mutex_lock(&nsock->tx_lock);
-               kernel_sock_shutdown(nsock->sock, SHUT_RDWR);
+               nbd_mark_nsock_dead(nbd, nsock, 0);
                mutex_unlock(&nsock->tx_lock);
        }
        dev_warn(disk_to_dev(nbd->disk), "shutting down sockets\n");
@@ -190,14 +282,58 @@ static enum blk_eh_timer_return nbd_xmit_timeout(struct request *req,
 {
        struct nbd_cmd *cmd = blk_mq_rq_to_pdu(req);
        struct nbd_device *nbd = cmd->nbd;
+       struct nbd_config *config;
 
-       dev_err(nbd_to_dev(nbd), "Connection timed out, shutting down connection\n");
-       set_bit(NBD_TIMEDOUT, &nbd->runtime_flags);
-       req->errors = -EIO;
+       if (!refcount_inc_not_zero(&nbd->config_refs)) {
+               cmd->status = -EIO;
+               return BLK_EH_HANDLED;
+       }
 
-       mutex_lock(&nbd->config_lock);
+       /* If we are waiting on our dead timer then we could get timeout
+        * callbacks for our request.  For this we just want to reset the timer
+        * and let the queue side take care of everything.
+        */
+       if (!completion_done(&cmd->send_complete)) {
+               nbd_config_put(nbd);
+               return BLK_EH_RESET_TIMER;
+       }
+       config = nbd->config;
+
+       if (config->num_connections > 1) {
+               dev_err_ratelimited(nbd_to_dev(nbd),
+                                   "Connection timed out, retrying\n");
+               /*
+                * Hooray we have more connections, requeue this IO, the submit
+                * path will put it on a real connection.
+                */
+               if (config->socks && config->num_connections > 1) {
+                       if (cmd->index < config->num_connections) {
+                               struct nbd_sock *nsock =
+                                       config->socks[cmd->index];
+                               mutex_lock(&nsock->tx_lock);
+                               /* We can have multiple outstanding requests, so
+                                * we don't want to mark the nsock dead if we've
+                                * already reconnected with a new socket, so
+                                * only mark it dead if its the same socket we
+                                * were sent out on.
+                                */
+                               if (cmd->cookie == nsock->cookie)
+                                       nbd_mark_nsock_dead(nbd, nsock, 1);
+                               mutex_unlock(&nsock->tx_lock);
+                       }
+                       blk_mq_requeue_request(req, true);
+                       nbd_config_put(nbd);
+                       return BLK_EH_NOT_HANDLED;
+               }
+       } else {
+               dev_err_ratelimited(nbd_to_dev(nbd),
+                                   "Connection timed out\n");
+       }
+       set_bit(NBD_TIMEDOUT, &config->runtime_flags);
+       cmd->status = -EIO;
        sock_shutdown(nbd);
-       mutex_unlock(&nbd->config_lock);
+       nbd_config_put(nbd);
+
        return BLK_EH_HANDLED;
 }
 
@@ -207,7 +343,8 @@ static enum blk_eh_timer_return nbd_xmit_timeout(struct request *req,
 static int sock_xmit(struct nbd_device *nbd, int index, int send,
                     struct iov_iter *iter, int msg_flags, int *sent)
 {
-       struct socket *sock = nbd->socks[index]->sock;
+       struct nbd_config *config = nbd->config;
+       struct socket *sock = config->socks[index]->sock;
        int result;
        struct msghdr msg;
        unsigned long pflags = current->flags;
@@ -244,7 +381,7 @@ static int sock_xmit(struct nbd_device *nbd, int index, int send,
                        *sent += result;
        } while (msg_data_left(&msg));
 
-       tsk_restore_flags(current, pflags, PF_MEMALLOC);
+       current_restore_flags(pflags, PF_MEMALLOC);
 
        return result;
 }
@@ -253,7 +390,8 @@ static int sock_xmit(struct nbd_device *nbd, int index, int send,
 static int nbd_send_cmd(struct nbd_device *nbd, struct nbd_cmd *cmd, int index)
 {
        struct request *req = blk_mq_rq_from_pdu(cmd);
-       struct nbd_sock *nsock = nbd->socks[index];
+       struct nbd_config *config = nbd->config;
+       struct nbd_sock *nsock = config->socks[index];
        int result;
        struct nbd_request request = {.magic = htonl(NBD_REQUEST_MAGIC)};
        struct kvec iov = {.iov_base = &request, .iov_len = sizeof(request)};
@@ -284,7 +422,7 @@ static int nbd_send_cmd(struct nbd_device *nbd, struct nbd_cmd *cmd, int index)
        }
 
        if (rq_data_dir(req) == WRITE &&
-           (nbd->flags & NBD_FLAG_READ_ONLY)) {
+           (config->flags & NBD_FLAG_READ_ONLY)) {
                dev_err_ratelimited(disk_to_dev(nbd->disk),
                                    "Write on read-only\n");
                return -EIO;
@@ -301,6 +439,8 @@ static int nbd_send_cmd(struct nbd_device *nbd, struct nbd_cmd *cmd, int index)
                }
                iov_iter_advance(&from, sent);
        }
+       cmd->index = index;
+       cmd->cookie = nsock->cookie;
        request.type = htonl(type);
        if (type != NBD_CMD_FLUSH) {
                request.from = cpu_to_be64((u64)blk_rq_pos(req) << 9);
@@ -328,7 +468,7 @@ static int nbd_send_cmd(struct nbd_device *nbd, struct nbd_cmd *cmd, int index)
                }
                dev_err_ratelimited(disk_to_dev(nbd->disk),
                        "Send control failed (result %d)\n", result);
-               return -EIO;
+               return -EAGAIN;
        }
 send_pages:
        if (type != NBD_CMD_WRITE)
@@ -370,7 +510,7 @@ send_pages:
                                dev_err(disk_to_dev(nbd->disk),
                                        "Send data failed (result %d)\n",
                                        result);
-                               return -EIO;
+                               return -EAGAIN;
                        }
                        /*
                         * The completion might already have come in,
@@ -392,6 +532,7 @@ out:
 /* NULL returned = something went wrong, inform userspace */
 static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd, int index)
 {
+       struct nbd_config *config = nbd->config;
        int result;
        struct nbd_reply reply;
        struct nbd_cmd *cmd;
@@ -405,8 +546,7 @@ static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd, int index)
        iov_iter_kvec(&to, READ | ITER_KVEC, &iov, 1, sizeof(reply));
        result = sock_xmit(nbd, index, 0, &to, MSG_WAITALL, NULL);
        if (result <= 0) {
-               if (!test_bit(NBD_DISCONNECTED, &nbd->runtime_flags) &&
-                   !test_bit(NBD_DISCONNECT_REQUESTED, &nbd->runtime_flags))
+               if (!nbd_disconnected(config))
                        dev_err(disk_to_dev(nbd->disk),
                                "Receive control failed (result %d)\n", result);
                return ERR_PTR(result);
@@ -433,7 +573,7 @@ static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd, int index)
        if (ntohl(reply.error)) {
                dev_err(disk_to_dev(nbd->disk), "Other side returned error (%d)\n",
                        ntohl(reply.error));
-               req->errors = -EIO;
+               cmd->status = -EIO;
                return cmd;
        }
 
@@ -449,8 +589,19 @@ static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd, int index)
                        if (result <= 0) {
                                dev_err(disk_to_dev(nbd->disk), "Receive data failed (result %d)\n",
                                        result);
-                               req->errors = -EIO;
-                               return cmd;
+                               /*
+                                * If we've disconnected or we only have 1
+                                * connection then we need to make sure we
+                                * complete this request, otherwise error out
+                                * and let the timeout stuff handle resubmitting
+                                * this request onto another connection.
+                                */
+                               if (nbd_disconnected(config) ||
+                                   config->num_connections <= 1) {
+                                       cmd->status = -EIO;
+                                       return cmd;
+                               }
+                               return ERR_PTR(-EIO);
                        }
                        dev_dbg(nbd_to_dev(nbd), "request %p: got %d bytes data\n",
                                cmd, bvec.bv_len);
@@ -462,54 +613,34 @@ static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd, int index)
        return cmd;
 }
 
-static ssize_t pid_show(struct device *dev,
-                       struct device_attribute *attr, char *buf)
-{
-       struct gendisk *disk = dev_to_disk(dev);
-       struct nbd_device *nbd = (struct nbd_device *)disk->private_data;
-
-       return sprintf(buf, "%d\n", task_pid_nr(nbd->task_recv));
-}
-
-static struct device_attribute pid_attr = {
-       .attr = { .name = "pid", .mode = S_IRUGO},
-       .show = pid_show,
-};
-
-struct recv_thread_args {
-       struct work_struct work;
-       struct nbd_device *nbd;
-       int index;
-};
-
 static void recv_work(struct work_struct *work)
 {
        struct recv_thread_args *args = container_of(work,
                                                     struct recv_thread_args,
                                                     work);
        struct nbd_device *nbd = args->nbd;
+       struct nbd_config *config = nbd->config;
        struct nbd_cmd *cmd;
        int ret = 0;
 
-       BUG_ON(nbd->magic != NBD_MAGIC);
        while (1) {
                cmd = nbd_read_stat(nbd, args->index);
                if (IS_ERR(cmd)) {
+                       struct nbd_sock *nsock = config->socks[args->index];
+
+                       mutex_lock(&nsock->tx_lock);
+                       nbd_mark_nsock_dead(nbd, nsock, 1);
+                       mutex_unlock(&nsock->tx_lock);
                        ret = PTR_ERR(cmd);
                        break;
                }
 
-               nbd_end_request(cmd);
+               blk_mq_complete_request(blk_mq_rq_from_pdu(cmd));
        }
-
-       /*
-        * We got an error, shut everybody down if this wasn't the result of a
-        * disconnect request.
-        */
-       if (ret && !test_bit(NBD_DISCONNECT_REQUESTED, &nbd->runtime_flags))
-               sock_shutdown(nbd);
-       atomic_dec(&nbd->recv_threads);
-       wake_up(&nbd->recv_wq);
+       atomic_dec(&config->recv_threads);
+       wake_up(&config->recv_wq);
+       nbd_config_put(nbd);
+       kfree(args);
 }
 
 static void nbd_clear_req(struct request *req, void *data, bool reserved)
@@ -519,47 +650,119 @@ static void nbd_clear_req(struct request *req, void *data, bool reserved)
        if (!blk_mq_request_started(req))
                return;
        cmd = blk_mq_rq_to_pdu(req);
-       req->errors = -EIO;
-       nbd_end_request(cmd);
+       cmd->status = -EIO;
+       blk_mq_complete_request(req);
 }
 
 static void nbd_clear_que(struct nbd_device *nbd)
 {
-       BUG_ON(nbd->magic != NBD_MAGIC);
-
+       blk_mq_stop_hw_queues(nbd->disk->queue);
        blk_mq_tagset_busy_iter(&nbd->tag_set, nbd_clear_req, NULL);
+       blk_mq_start_hw_queues(nbd->disk->queue);
        dev_dbg(disk_to_dev(nbd->disk), "queue cleared\n");
 }
 
+static int find_fallback(struct nbd_device *nbd, int index)
+{
+       struct nbd_config *config = nbd->config;
+       int new_index = -1;
+       struct nbd_sock *nsock = config->socks[index];
+       int fallback = nsock->fallback_index;
+
+       if (test_bit(NBD_DISCONNECTED, &config->runtime_flags))
+               return new_index;
+
+       if (config->num_connections <= 1) {
+               dev_err_ratelimited(disk_to_dev(nbd->disk),
+                                   "Attempted send on invalid socket\n");
+               return new_index;
+       }
+
+       if (fallback >= 0 && fallback < config->num_connections &&
+           !config->socks[fallback]->dead)
+               return fallback;
+
+       if (nsock->fallback_index < 0 ||
+           nsock->fallback_index >= config->num_connections ||
+           config->socks[nsock->fallback_index]->dead) {
+               int i;
+               for (i = 0; i < config->num_connections; i++) {
+                       if (i == index)
+                               continue;
+                       if (!config->socks[i]->dead) {
+                               new_index = i;
+                               break;
+                       }
+               }
+               nsock->fallback_index = new_index;
+               if (new_index < 0) {
+                       dev_err_ratelimited(disk_to_dev(nbd->disk),
+                                           "Dead connection, failed to find a fallback\n");
+                       return new_index;
+               }
+       }
+       new_index = nsock->fallback_index;
+       return new_index;
+}
+
+static int wait_for_reconnect(struct nbd_device *nbd)
+{
+       struct nbd_config *config = nbd->config;
+       if (!config->dead_conn_timeout)
+               return 0;
+       if (test_bit(NBD_DISCONNECTED, &config->runtime_flags))
+               return 0;
+       wait_event_interruptible_timeout(config->conn_wait,
+                                        atomic_read(&config->live_connections),
+                                        config->dead_conn_timeout);
+       return atomic_read(&config->live_connections);
+}
 
 static int nbd_handle_cmd(struct nbd_cmd *cmd, int index)
 {
        struct request *req = blk_mq_rq_from_pdu(cmd);
        struct nbd_device *nbd = cmd->nbd;
+       struct nbd_config *config;
        struct nbd_sock *nsock;
        int ret;
 
-       if (index >= nbd->num_connections) {
+       if (!refcount_inc_not_zero(&nbd->config_refs)) {
                dev_err_ratelimited(disk_to_dev(nbd->disk),
-                                   "Attempted send on invalid socket\n");
+                                   "Socks array is empty\n");
                return -EINVAL;
        }
+       config = nbd->config;
 
-       if (test_bit(NBD_DISCONNECTED, &nbd->runtime_flags)) {
+       if (index >= config->num_connections) {
                dev_err_ratelimited(disk_to_dev(nbd->disk),
-                                   "Attempted send on closed socket\n");
+                                   "Attempted send on invalid socket\n");
+               nbd_config_put(nbd);
                return -EINVAL;
        }
-
-       req->errors = 0;
-
-       nsock = nbd->socks[index];
+       cmd->status = 0;
+again:
+       nsock = config->socks[index];
        mutex_lock(&nsock->tx_lock);
-       if (unlikely(!nsock->sock)) {
+       if (nsock->dead) {
+               int old_index = index;
+               index = find_fallback(nbd, index);
                mutex_unlock(&nsock->tx_lock);
-               dev_err_ratelimited(disk_to_dev(nbd->disk),
-                                   "Attempted send on closed socket\n");
-               return -EINVAL;
+               if (index < 0) {
+                       if (wait_for_reconnect(nbd)) {
+                               index = old_index;
+                               goto again;
+                       }
+                       /* All the sockets should already be down at this point,
+                        * we just want to make sure that DISCONNECTED is set so
+                        * any requests that come in that were queue'ed waiting
+                        * for the reconnect timer don't trigger the timer again
+                        * and instead just error out.
+                        */
+                       sock_shutdown(nbd);
+                       nbd_config_put(nbd);
+                       return -EIO;
+               }
+               goto again;
        }
 
        /* Handle the case that we have a pending request that was partially
@@ -572,9 +775,21 @@ static int nbd_handle_cmd(struct nbd_cmd *cmd, int index)
                ret = 0;
                goto out;
        }
+       /*
+        * Some failures are related to the link going down, so anything that
+        * returns EAGAIN can be retried on a different socket.
+        */
        ret = nbd_send_cmd(nbd, cmd, index);
+       if (ret == -EAGAIN) {
+               dev_err_ratelimited(disk_to_dev(nbd->disk),
+                                   "Request send failed trying another connection\n");
+               nbd_mark_nsock_dead(nbd, nsock, 1);
+               mutex_unlock(&nsock->tx_lock);
+               goto again;
+       }
 out:
        mutex_unlock(&nsock->tx_lock);
+       nbd_config_put(nbd);
        return ret;
 }
 
@@ -611,9 +826,10 @@ static int nbd_queue_rq(struct blk_mq_hw_ctx *hctx,
        return ret;
 }
 
-static int nbd_add_socket(struct nbd_device *nbd, struct block_device *bdev,
-                         unsigned long arg)
+static int nbd_add_socket(struct nbd_device *nbd, unsigned long arg,
+                         bool netlink)
 {
+       struct nbd_config *config = nbd->config;
        struct socket *sock;
        struct nbd_sock **socks;
        struct nbd_sock *nsock;
@@ -623,43 +839,107 @@ static int nbd_add_socket(struct nbd_device *nbd, struct block_device *bdev,
        if (!sock)
                return err;
 
-       if (!nbd->task_setup)
+       if (!netlink && !nbd->task_setup &&
+           !test_bit(NBD_BOUND, &config->runtime_flags))
                nbd->task_setup = current;
-       if (nbd->task_setup != current) {
+
+       if (!netlink &&
+           (nbd->task_setup != current ||
+            test_bit(NBD_BOUND, &config->runtime_flags))) {
                dev_err(disk_to_dev(nbd->disk),
                        "Device being setup by another task");
-               return -EINVAL;
+               sockfd_put(sock);
+               return -EBUSY;
        }
 
-       socks = krealloc(nbd->socks, (nbd->num_connections + 1) *
+       socks = krealloc(config->socks, (config->num_connections + 1) *
                         sizeof(struct nbd_sock *), GFP_KERNEL);
-       if (!socks)
+       if (!socks) {
+               sockfd_put(sock);
                return -ENOMEM;
+       }
        nsock = kzalloc(sizeof(struct nbd_sock), GFP_KERNEL);
-       if (!nsock)
+       if (!nsock) {
+               sockfd_put(sock);
                return -ENOMEM;
+       }
 
-       nbd->socks = socks;
+       config->socks = socks;
 
+       nsock->fallback_index = -1;
+       nsock->dead = false;
        mutex_init(&nsock->tx_lock);
        nsock->sock = sock;
        nsock->pending = NULL;
        nsock->sent = 0;
-       socks[nbd->num_connections++] = nsock;
+       nsock->cookie = 0;
+       socks[config->num_connections++] = nsock;
+       atomic_inc(&config->live_connections);
 
-       if (max_part)
-               bdev->bd_invalidated = 1;
        return 0;
 }
 
+static int nbd_reconnect_socket(struct nbd_device *nbd, unsigned long arg)
+{
+       struct nbd_config *config = nbd->config;
+       struct socket *sock, *old;
+       struct recv_thread_args *args;
+       int i;
+       int err;
+
+       sock = sockfd_lookup(arg, &err);
+       if (!sock)
+               return err;
+
+       args = kzalloc(sizeof(*args), GFP_KERNEL);
+       if (!args) {
+               sockfd_put(sock);
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < config->num_connections; i++) {
+               struct nbd_sock *nsock = config->socks[i];
+
+               if (!nsock->dead)
+                       continue;
+
+               mutex_lock(&nsock->tx_lock);
+               if (!nsock->dead) {
+                       mutex_unlock(&nsock->tx_lock);
+                       continue;
+               }
+               sk_set_memalloc(sock->sk);
+               atomic_inc(&config->recv_threads);
+               refcount_inc(&nbd->config_refs);
+               old = nsock->sock;
+               nsock->fallback_index = -1;
+               nsock->sock = sock;
+               nsock->dead = false;
+               INIT_WORK(&args->work, recv_work);
+               args->index = i;
+               args->nbd = nbd;
+               nsock->cookie++;
+               mutex_unlock(&nsock->tx_lock);
+               sockfd_put(old);
+
+               /* We take the tx_mutex in an error path in the recv_work, so we
+                * need to queue_work outside of the tx_mutex.
+                */
+               queue_work(recv_workqueue, &args->work);
+
+               atomic_inc(&config->live_connections);
+               wake_up(&config->conn_wait);
+               return 0;
+       }
+       sockfd_put(sock);
+       kfree(args);
+       return -ENOSPC;
+}
+
 /* Reset all properties of an NBD device */
 static void nbd_reset(struct nbd_device *nbd)
 {
-       nbd->runtime_flags = 0;
-       nbd->blksize = 1024;
-       nbd->bytesize = 0;
-       set_capacity(nbd->disk, 0);
-       nbd->flags = 0;
+       nbd->config = NULL;
        nbd->tag_set.timeout = 0;
        queue_flag_clear_unlocked(QUEUE_FLAG_DISCARD, nbd->disk->queue);
 }
@@ -668,21 +948,23 @@ static void nbd_bdev_reset(struct block_device *bdev)
 {
        if (bdev->bd_openers > 1)
                return;
-       set_device_ro(bdev, false);
-       bdev->bd_inode->i_size = 0;
+       bd_set_size(bdev, 0);
        if (max_part > 0) {
                blkdev_reread_part(bdev);
                bdev->bd_invalidated = 1;
        }
 }
 
-static void nbd_parse_flags(struct nbd_device *nbd, struct block_device *bdev)
+static void nbd_parse_flags(struct nbd_device *nbd)
 {
-       if (nbd->flags & NBD_FLAG_READ_ONLY)
-               set_device_ro(bdev, true);
-       if (nbd->flags & NBD_FLAG_SEND_TRIM)
+       struct nbd_config *config = nbd->config;
+       if (config->flags & NBD_FLAG_READ_ONLY)
+               set_disk_ro(nbd->disk, true);
+       else
+               set_disk_ro(nbd->disk, false);
+       if (config->flags & NBD_FLAG_SEND_TRIM)
                queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, nbd->disk->queue);
-       if (nbd->flags & NBD_FLAG_SEND_FLUSH)
+       if (config->flags & NBD_FLAG_SEND_FLUSH)
                blk_queue_write_cache(nbd->disk->queue, true, false);
        else
                blk_queue_write_cache(nbd->disk->queue, false, false);
@@ -690,6 +972,7 @@ static void nbd_parse_flags(struct nbd_device *nbd, struct block_device *bdev)
 
 static void send_disconnects(struct nbd_device *nbd)
 {
+       struct nbd_config *config = nbd->config;
        struct nbd_request request = {
                .magic = htonl(NBD_REQUEST_MAGIC),
                .type = htonl(NBD_CMD_DISC),
@@ -698,7 +981,7 @@ static void send_disconnects(struct nbd_device *nbd)
        struct iov_iter from;
        int i, ret;
 
-       for (i = 0; i < nbd->num_connections; i++) {
+       for (i = 0; i < config->num_connections; i++) {
                iov_iter_kvec(&from, WRITE | ITER_KVEC, &iov, 1, sizeof(request));
                ret = sock_xmit(nbd, i, 1, &from, 0, NULL);
                if (ret <= 0)
@@ -707,145 +990,162 @@ static void send_disconnects(struct nbd_device *nbd)
        }
 }
 
-static int nbd_disconnect(struct nbd_device *nbd, struct block_device *bdev)
+static int nbd_disconnect(struct nbd_device *nbd)
 {
-       dev_info(disk_to_dev(nbd->disk), "NBD_DISCONNECT\n");
-       if (!nbd->socks)
-               return -EINVAL;
-
-       mutex_unlock(&nbd->config_lock);
-       fsync_bdev(bdev);
-       mutex_lock(&nbd->config_lock);
-
-       /* Check again after getting mutex back.  */
-       if (!nbd->socks)
-               return -EINVAL;
+       struct nbd_config *config = nbd->config;
 
+       dev_info(disk_to_dev(nbd->disk), "NBD_DISCONNECT\n");
        if (!test_and_set_bit(NBD_DISCONNECT_REQUESTED,
-                             &nbd->runtime_flags))
+                             &config->runtime_flags))
                send_disconnects(nbd);
        return 0;
 }
 
-static int nbd_clear_sock(struct nbd_device *nbd, struct block_device *bdev)
+static void nbd_clear_sock(struct nbd_device *nbd)
 {
        sock_shutdown(nbd);
        nbd_clear_que(nbd);
+       nbd->task_setup = NULL;
+}
 
-       __invalidate_device(bdev, true);
-       nbd_bdev_reset(bdev);
-       /*
-        * We want to give the run thread a chance to wait for everybody
-        * to clean up and then do it's own cleanup.
-        */
-       if (!test_bit(NBD_RUNNING, &nbd->runtime_flags) &&
-           nbd->num_connections) {
-               int i;
-
-               for (i = 0; i < nbd->num_connections; i++) {
-                       sockfd_put(nbd->socks[i]->sock);
-                       kfree(nbd->socks[i]);
+static void nbd_config_put(struct nbd_device *nbd)
+{
+       if (refcount_dec_and_mutex_lock(&nbd->config_refs,
+                                       &nbd->config_lock)) {
+               struct nbd_config *config = nbd->config;
+               nbd_dev_dbg_close(nbd);
+               nbd_size_clear(nbd);
+               if (test_and_clear_bit(NBD_HAS_PID_FILE,
+                                      &config->runtime_flags))
+                       device_remove_file(disk_to_dev(nbd->disk), &pid_attr);
+               nbd->task_recv = NULL;
+               nbd_clear_sock(nbd);
+               if (config->num_connections) {
+                       int i;
+                       for (i = 0; i < config->num_connections; i++) {
+                               sockfd_put(config->socks[i]->sock);
+                               kfree(config->socks[i]);
+                       }
+                       kfree(config->socks);
                }
-               kfree(nbd->socks);
-               nbd->socks = NULL;
-               nbd->num_connections = 0;
-       }
-       nbd->task_setup = NULL;
+               nbd_reset(nbd);
 
-       return 0;
+               mutex_unlock(&nbd->config_lock);
+               nbd_put(nbd);
+               module_put(THIS_MODULE);
+       }
 }
 
-static int nbd_start_device(struct nbd_device *nbd, struct block_device *bdev)
+static int nbd_start_device(struct nbd_device *nbd)
 {
-       struct recv_thread_args *args;
-       int num_connections = nbd->num_connections;
+       struct nbd_config *config = nbd->config;
+       int num_connections = config->num_connections;
        int error = 0, i;
 
        if (nbd->task_recv)
                return -EBUSY;
-       if (!nbd->socks)
+       if (!config->socks)
                return -EINVAL;
        if (num_connections > 1 &&
-           !(nbd->flags & NBD_FLAG_CAN_MULTI_CONN)) {
+           !(config->flags & NBD_FLAG_CAN_MULTI_CONN)) {
                dev_err(disk_to_dev(nbd->disk), "server does not support multiple connections per device.\n");
-               error = -EINVAL;
-               goto out_err;
+               return -EINVAL;
        }
 
-       set_bit(NBD_RUNNING, &nbd->runtime_flags);
-       blk_mq_update_nr_hw_queues(&nbd->tag_set, nbd->num_connections);
-       args = kcalloc(num_connections, sizeof(*args), GFP_KERNEL);
-       if (!args) {
-               error = -ENOMEM;
-               goto out_err;
-       }
+       blk_mq_update_nr_hw_queues(&nbd->tag_set, config->num_connections);
        nbd->task_recv = current;
-       mutex_unlock(&nbd->config_lock);
 
-       nbd_parse_flags(nbd, bdev);
+       nbd_parse_flags(nbd);
 
        error = device_create_file(disk_to_dev(nbd->disk), &pid_attr);
        if (error) {
                dev_err(disk_to_dev(nbd->disk), "device_create_file failed!\n");
-               goto out_recv;
+               return error;
        }
-
-       nbd_size_update(nbd, bdev);
+       set_bit(NBD_HAS_PID_FILE, &config->runtime_flags);
 
        nbd_dev_dbg_init(nbd);
        for (i = 0; i < num_connections; i++) {
-               sk_set_memalloc(nbd->socks[i]->sock->sk);
-               atomic_inc(&nbd->recv_threads);
-               INIT_WORK(&args[i].work, recv_work);
-               args[i].nbd = nbd;
-               args[i].index = i;
-               queue_work(recv_workqueue, &args[i].work);
-       }
-       wait_event_interruptible(nbd->recv_wq,
-                                atomic_read(&nbd->recv_threads) == 0);
-       for (i = 0; i < num_connections; i++)
-               flush_work(&args[i].work);
-       nbd_dev_dbg_close(nbd);
-       nbd_size_clear(nbd, bdev);
-       device_remove_file(disk_to_dev(nbd->disk), &pid_attr);
-out_recv:
-       mutex_lock(&nbd->config_lock);
-       nbd->task_recv = NULL;
-out_err:
-       clear_bit(NBD_RUNNING, &nbd->runtime_flags);
-       nbd_clear_sock(nbd, bdev);
+               struct recv_thread_args *args;
 
+               args = kzalloc(sizeof(*args), GFP_KERNEL);
+               if (!args) {
+                       sock_shutdown(nbd);
+                       return -ENOMEM;
+               }
+               sk_set_memalloc(config->socks[i]->sock->sk);
+               atomic_inc(&config->recv_threads);
+               refcount_inc(&nbd->config_refs);
+               INIT_WORK(&args->work, recv_work);
+               args->nbd = nbd;
+               args->index = i;
+               queue_work(recv_workqueue, &args->work);
+       }
+       return error;
+}
+
+static int nbd_start_device_ioctl(struct nbd_device *nbd, struct block_device *bdev)
+{
+       struct nbd_config *config = nbd->config;
+       int ret;
+
+       ret = nbd_start_device(nbd);
+       if (ret)
+               return ret;
+
+       bd_set_size(bdev, config->bytesize);
+       if (max_part)
+               bdev->bd_invalidated = 1;
+       mutex_unlock(&nbd->config_lock);
+       ret = wait_event_interruptible(config->recv_wq,
+                                        atomic_read(&config->recv_threads) == 0);
+       if (ret)
+               sock_shutdown(nbd);
+       mutex_lock(&nbd->config_lock);
+       bd_set_size(bdev, 0);
        /* user requested, ignore socket errors */
-       if (test_bit(NBD_DISCONNECT_REQUESTED, &nbd->runtime_flags))
-               error = 0;
-       if (test_bit(NBD_TIMEDOUT, &nbd->runtime_flags))
-               error = -ETIMEDOUT;
+       if (test_bit(NBD_DISCONNECT_REQUESTED, &config->runtime_flags))
+               ret = 0;
+       if (test_bit(NBD_TIMEDOUT, &config->runtime_flags))
+               ret = -ETIMEDOUT;
+       return ret;
+}
 
-       nbd_reset(nbd);
-       return error;
+static void nbd_clear_sock_ioctl(struct nbd_device *nbd,
+                                struct block_device *bdev)
+{
+       sock_shutdown(nbd);
+       kill_bdev(bdev);
+       nbd_bdev_reset(bdev);
+       if (test_and_clear_bit(NBD_HAS_CONFIG_REF,
+                              &nbd->config->runtime_flags))
+               nbd_config_put(nbd);
 }
 
 /* Must be called with config_lock held */
 static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd,
                       unsigned int cmd, unsigned long arg)
 {
+       struct nbd_config *config = nbd->config;
+
        switch (cmd) {
        case NBD_DISCONNECT:
-               return nbd_disconnect(nbd, bdev);
+               return nbd_disconnect(nbd);
        case NBD_CLEAR_SOCK:
-               return nbd_clear_sock(nbd, bdev);
+               nbd_clear_sock_ioctl(nbd, bdev);
+               return 0;
        case NBD_SET_SOCK:
-               return nbd_add_socket(nbd, bdev, arg);
+               return nbd_add_socket(nbd, arg, false);
        case NBD_SET_BLKSIZE:
-               nbd_size_set(nbd, bdev, arg,
-                            div_s64(nbd->bytesize, arg));
+               nbd_size_set(nbd, arg,
+                            div_s64(config->bytesize, arg));
                return 0;
        case NBD_SET_SIZE:
-               nbd_size_set(nbd, bdev, nbd->blksize,
-                            div_s64(arg, nbd->blksize));
+               nbd_size_set(nbd, config->blksize,
+                            div_s64(arg, config->blksize));
                return 0;
        case NBD_SET_SIZE_BLOCKS:
-               nbd_size_set(nbd, bdev, nbd->blksize, arg);
+               nbd_size_set(nbd, config->blksize, arg);
                return 0;
        case NBD_SET_TIMEOUT:
                if (arg) {
@@ -855,10 +1155,10 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd,
                return 0;
 
        case NBD_SET_FLAGS:
-               nbd->flags = arg;
+               config->flags = arg;
                return 0;
        case NBD_DO_IT:
-               return nbd_start_device(nbd, bdev);
+               return nbd_start_device_ioctl(nbd, bdev);
        case NBD_CLEAR_QUE:
                /*
                 * This is for compatibility only.  The queue is always cleared
@@ -879,23 +1179,92 @@ static int nbd_ioctl(struct block_device *bdev, fmode_t mode,
                     unsigned int cmd, unsigned long arg)
 {
        struct nbd_device *nbd = bdev->bd_disk->private_data;
-       int error;
+       struct nbd_config *config = nbd->config;
+       int error = -EINVAL;
 
        if (!capable(CAP_SYS_ADMIN))
                return -EPERM;
 
-       BUG_ON(nbd->magic != NBD_MAGIC);
-
        mutex_lock(&nbd->config_lock);
-       error = __nbd_ioctl(bdev, nbd, cmd, arg);
-       mutex_unlock(&nbd->config_lock);
 
+       /* Don't allow ioctl operations on a nbd device that was created with
+        * netlink, unless it's DISCONNECT or CLEAR_SOCK, which are fine.
+        */
+       if (!test_bit(NBD_BOUND, &config->runtime_flags) ||
+           (cmd == NBD_DISCONNECT || cmd == NBD_CLEAR_SOCK))
+               error = __nbd_ioctl(bdev, nbd, cmd, arg);
+       else
+               dev_err(nbd_to_dev(nbd), "Cannot use ioctl interface on a netlink controlled device.\n");
+       mutex_unlock(&nbd->config_lock);
        return error;
 }
 
+static struct nbd_config *nbd_alloc_config(void)
+{
+       struct nbd_config *config;
+
+       config = kzalloc(sizeof(struct nbd_config), GFP_NOFS);
+       if (!config)
+               return NULL;
+       atomic_set(&config->recv_threads, 0);
+       init_waitqueue_head(&config->recv_wq);
+       init_waitqueue_head(&config->conn_wait);
+       config->blksize = 1024;
+       atomic_set(&config->live_connections, 0);
+       try_module_get(THIS_MODULE);
+       return config;
+}
+
+static int nbd_open(struct block_device *bdev, fmode_t mode)
+{
+       struct nbd_device *nbd;
+       int ret = 0;
+
+       mutex_lock(&nbd_index_mutex);
+       nbd = bdev->bd_disk->private_data;
+       if (!nbd) {
+               ret = -ENXIO;
+               goto out;
+       }
+       if (!refcount_inc_not_zero(&nbd->refs)) {
+               ret = -ENXIO;
+               goto out;
+       }
+       if (!refcount_inc_not_zero(&nbd->config_refs)) {
+               struct nbd_config *config;
+
+               mutex_lock(&nbd->config_lock);
+               if (refcount_inc_not_zero(&nbd->config_refs)) {
+                       mutex_unlock(&nbd->config_lock);
+                       goto out;
+               }
+               config = nbd->config = nbd_alloc_config();
+               if (!config) {
+                       ret = -ENOMEM;
+                       mutex_unlock(&nbd->config_lock);
+                       goto out;
+               }
+               refcount_set(&nbd->config_refs, 1);
+               refcount_inc(&nbd->refs);
+               mutex_unlock(&nbd->config_lock);
+       }
+out:
+       mutex_unlock(&nbd_index_mutex);
+       return ret;
+}
+
+static void nbd_release(struct gendisk *disk, fmode_t mode)
+{
+       struct nbd_device *nbd = disk->private_data;
+       nbd_config_put(nbd);
+       nbd_put(nbd);
+}
+
 static const struct block_device_operations nbd_fops =
 {
        .owner =        THIS_MODULE,
+       .open =         nbd_open,
+       .release =      nbd_release,
        .ioctl =        nbd_ioctl,
        .compat_ioctl = nbd_ioctl,
 };
@@ -927,7 +1296,7 @@ static const struct file_operations nbd_dbg_tasks_ops = {
 static int nbd_dbg_flags_show(struct seq_file *s, void *unused)
 {
        struct nbd_device *nbd = s->private;
-       u32 flags = nbd->flags;
+       u32 flags = nbd->config->flags;
 
        seq_printf(s, "Hex: 0x%08x\n\n", flags);
 
@@ -960,6 +1329,7 @@ static const struct file_operations nbd_dbg_flags_ops = {
 static int nbd_dev_dbg_init(struct nbd_device *nbd)
 {
        struct dentry *dir;
+       struct nbd_config *config = nbd->config;
 
        if (!nbd_dbg_dir)
                return -EIO;
@@ -970,12 +1340,12 @@ static int nbd_dev_dbg_init(struct nbd_device *nbd)
                        nbd_name(nbd));
                return -EIO;
        }
-       nbd->dbg_dir = dir;
+       config->dbg_dir = dir;
 
        debugfs_create_file("tasks", 0444, dir, nbd, &nbd_dbg_tasks_ops);
-       debugfs_create_u64("size_bytes", 0444, dir, &nbd->bytesize);
+       debugfs_create_u64("size_bytes", 0444, dir, &config->bytesize);
        debugfs_create_u32("timeout", 0444, dir, &nbd->tag_set.timeout);
-       debugfs_create_u64("blocksize", 0444, dir, &nbd->blksize);
+       debugfs_create_u64("blocksize", 0444, dir, &config->blksize);
        debugfs_create_file("flags", 0444, dir, nbd, &nbd_dbg_flags_ops);
 
        return 0;
@@ -983,7 +1353,7 @@ static int nbd_dev_dbg_init(struct nbd_device *nbd)
 
 static void nbd_dev_dbg_close(struct nbd_device *nbd)
 {
-       debugfs_remove_recursive(nbd->dbg_dir);
+       debugfs_remove_recursive(nbd->config->dbg_dir);
 }
 
 static int nbd_dbg_init(void)
@@ -1035,25 +1405,13 @@ static int nbd_init_request(void *data, struct request *rq,
        return 0;
 }
 
-static struct blk_mq_ops nbd_mq_ops = {
+static const struct blk_mq_ops nbd_mq_ops = {
        .queue_rq       = nbd_queue_rq,
+       .complete       = nbd_complete_rq,
        .init_request   = nbd_init_request,
        .timeout        = nbd_xmit_timeout,
 };
 
-static void nbd_dev_remove(struct nbd_device *nbd)
-{
-       struct gendisk *disk = nbd->disk;
-       nbd->magic = 0;
-       if (disk) {
-               del_gendisk(disk);
-               blk_cleanup_queue(disk->queue);
-               blk_mq_free_tag_set(&nbd->tag_set);
-               put_disk(disk);
-       }
-       kfree(nbd);
-}
-
 static int nbd_dev_add(int index)
 {
        struct nbd_device *nbd;
@@ -1082,6 +1440,7 @@ static int nbd_dev_add(int index)
        if (err < 0)
                goto out_free_disk;
 
+       nbd->index = index;
        nbd->disk = disk;
        nbd->tag_set.ops = &nbd_mq_ops;
        nbd->tag_set.nr_hw_queues = 1;
@@ -1110,20 +1469,23 @@ static int nbd_dev_add(int index)
        queue_flag_clear_unlocked(QUEUE_FLAG_ADD_RANDOM, disk->queue);
        disk->queue->limits.discard_granularity = 512;
        blk_queue_max_discard_sectors(disk->queue, UINT_MAX);
-       disk->queue->limits.discard_zeroes_data = 0;
+       blk_queue_max_segment_size(disk->queue, UINT_MAX);
+       blk_queue_max_segments(disk->queue, USHRT_MAX);
        blk_queue_max_hw_sectors(disk->queue, 65536);
        disk->queue->limits.max_sectors = 256;
 
-       nbd->magic = NBD_MAGIC;
        mutex_init(&nbd->config_lock);
+       refcount_set(&nbd->config_refs, 0);
+       refcount_set(&nbd->refs, 1);
+       INIT_LIST_HEAD(&nbd->list);
        disk->major = NBD_MAJOR;
        disk->first_minor = index << part_shift;
        disk->fops = &nbd_fops;
        disk->private_data = nbd;
        sprintf(disk->disk_name, "nbd%d", index);
-       init_waitqueue_head(&nbd->recv_wq);
        nbd_reset(nbd);
        add_disk(disk);
+       nbd_total_devices++;
        return index;
 
 out_free_tags:
@@ -1138,10 +1500,535 @@ out:
        return err;
 }
 
-/*
- * And here should be modules and kernel interface 
- *  (Just smiley confuses emacs :-)
+static int find_free_cb(int id, void *ptr, void *data)
+{
+       struct nbd_device *nbd = ptr;
+       struct nbd_device **found = data;
+
+       if (!refcount_read(&nbd->config_refs)) {
+               *found = nbd;
+               return 1;
+       }
+       return 0;
+}
+
+/* Netlink interface. */
+static struct nla_policy nbd_attr_policy[NBD_ATTR_MAX + 1] = {
+       [NBD_ATTR_INDEX]                =       { .type = NLA_U32 },
+       [NBD_ATTR_SIZE_BYTES]           =       { .type = NLA_U64 },
+       [NBD_ATTR_BLOCK_SIZE_BYTES]     =       { .type = NLA_U64 },
+       [NBD_ATTR_TIMEOUT]              =       { .type = NLA_U64 },
+       [NBD_ATTR_SERVER_FLAGS]         =       { .type = NLA_U64 },
+       [NBD_ATTR_CLIENT_FLAGS]         =       { .type = NLA_U64 },
+       [NBD_ATTR_SOCKETS]              =       { .type = NLA_NESTED},
+       [NBD_ATTR_DEAD_CONN_TIMEOUT]    =       { .type = NLA_U64 },
+       [NBD_ATTR_DEVICE_LIST]          =       { .type = NLA_NESTED},
+};
+
+static struct nla_policy nbd_sock_policy[NBD_SOCK_MAX + 1] = {
+       [NBD_SOCK_FD]                   =       { .type = NLA_U32 },
+};
+
+/* We don't use this right now since we don't parse the incoming list, but we
+ * still want it here so userspace knows what to expect.
  */
+static struct nla_policy __attribute__((unused))
+nbd_device_policy[NBD_DEVICE_ATTR_MAX + 1] = {
+       [NBD_DEVICE_INDEX]              =       { .type = NLA_U32 },
+       [NBD_DEVICE_CONNECTED]          =       { .type = NLA_U8 },
+};
+
+static int nbd_genl_connect(struct sk_buff *skb, struct genl_info *info)
+{
+       struct nbd_device *nbd = NULL;
+       struct nbd_config *config;
+       int index = -1;
+       int ret;
+       bool put_dev = false;
+
+       if (!netlink_capable(skb, CAP_SYS_ADMIN))
+               return -EPERM;
+
+       if (info->attrs[NBD_ATTR_INDEX])
+               index = nla_get_u32(info->attrs[NBD_ATTR_INDEX]);
+       if (!info->attrs[NBD_ATTR_SOCKETS]) {
+               printk(KERN_ERR "nbd: must specify at least one socket\n");
+               return -EINVAL;
+       }
+       if (!info->attrs[NBD_ATTR_SIZE_BYTES]) {
+               printk(KERN_ERR "nbd: must specify a size in bytes for the device\n");
+               return -EINVAL;
+       }
+again:
+       mutex_lock(&nbd_index_mutex);
+       if (index == -1) {
+               ret = idr_for_each(&nbd_index_idr, &find_free_cb, &nbd);
+               if (ret == 0) {
+                       int new_index;
+                       new_index = nbd_dev_add(-1);
+                       if (new_index < 0) {
+                               mutex_unlock(&nbd_index_mutex);
+                               printk(KERN_ERR "nbd: failed to add new device\n");
+                               return ret;
+                       }
+                       nbd = idr_find(&nbd_index_idr, new_index);
+               }
+       } else {
+               nbd = idr_find(&nbd_index_idr, index);
+       }
+       if (!nbd) {
+               printk(KERN_ERR "nbd: couldn't find device at index %d\n",
+                      index);
+               mutex_unlock(&nbd_index_mutex);
+               return -EINVAL;
+       }
+       if (!refcount_inc_not_zero(&nbd->refs)) {
+               mutex_unlock(&nbd_index_mutex);
+               if (index == -1)
+                       goto again;
+               printk(KERN_ERR "nbd: device at index %d is going down\n",
+                      index);
+               return -EINVAL;
+       }
+       mutex_unlock(&nbd_index_mutex);
+
+       mutex_lock(&nbd->config_lock);
+       if (refcount_read(&nbd->config_refs)) {
+               mutex_unlock(&nbd->config_lock);
+               nbd_put(nbd);
+               if (index == -1)
+                       goto again;
+               printk(KERN_ERR "nbd: nbd%d already in use\n", index);
+               return -EBUSY;
+       }
+       if (WARN_ON(nbd->config)) {
+               mutex_unlock(&nbd->config_lock);
+               nbd_put(nbd);
+               return -EINVAL;
+       }
+       config = nbd->config = nbd_alloc_config();
+       if (!nbd->config) {
+               mutex_unlock(&nbd->config_lock);
+               nbd_put(nbd);
+               printk(KERN_ERR "nbd: couldn't allocate config\n");
+               return -ENOMEM;
+       }
+       refcount_set(&nbd->config_refs, 1);
+       set_bit(NBD_BOUND, &config->runtime_flags);
+
+       if (info->attrs[NBD_ATTR_SIZE_BYTES]) {
+               u64 bytes = nla_get_u64(info->attrs[NBD_ATTR_SIZE_BYTES]);
+               nbd_size_set(nbd, config->blksize,
+                            div64_u64(bytes, config->blksize));
+       }
+       if (info->attrs[NBD_ATTR_BLOCK_SIZE_BYTES]) {
+               u64 bsize =
+                       nla_get_u64(info->attrs[NBD_ATTR_BLOCK_SIZE_BYTES]);
+               nbd_size_set(nbd, bsize, div64_u64(config->bytesize, bsize));
+       }
+       if (info->attrs[NBD_ATTR_TIMEOUT]) {
+               u64 timeout = nla_get_u64(info->attrs[NBD_ATTR_TIMEOUT]);
+               nbd->tag_set.timeout = timeout * HZ;
+               blk_queue_rq_timeout(nbd->disk->queue, timeout * HZ);
+       }
+       if (info->attrs[NBD_ATTR_DEAD_CONN_TIMEOUT]) {
+               config->dead_conn_timeout =
+                       nla_get_u64(info->attrs[NBD_ATTR_DEAD_CONN_TIMEOUT]);
+               config->dead_conn_timeout *= HZ;
+       }
+       if (info->attrs[NBD_ATTR_SERVER_FLAGS])
+               config->flags =
+                       nla_get_u64(info->attrs[NBD_ATTR_SERVER_FLAGS]);
+       if (info->attrs[NBD_ATTR_CLIENT_FLAGS]) {
+               u64 flags = nla_get_u64(info->attrs[NBD_ATTR_CLIENT_FLAGS]);
+               if (flags & NBD_CFLAG_DESTROY_ON_DISCONNECT) {
+                       set_bit(NBD_DESTROY_ON_DISCONNECT,
+                               &config->runtime_flags);
+                       put_dev = true;
+               }
+       }
+
+       if (info->attrs[NBD_ATTR_SOCKETS]) {
+               struct nlattr *attr;
+               int rem, fd;
+
+               nla_for_each_nested(attr, info->attrs[NBD_ATTR_SOCKETS],
+                                   rem) {
+                       struct nlattr *socks[NBD_SOCK_MAX+1];
+
+                       if (nla_type(attr) != NBD_SOCK_ITEM) {
+                               printk(KERN_ERR "nbd: socks must be embedded in a SOCK_ITEM attr\n");
+                               ret = -EINVAL;
+                               goto out;
+                       }
+                       ret = nla_parse_nested(socks, NBD_SOCK_MAX, attr,
+                                              nbd_sock_policy, info->extack);
+                       if (ret != 0) {
+                               printk(KERN_ERR "nbd: error processing sock list\n");
+                               ret = -EINVAL;
+                               goto out;
+                       }
+                       if (!socks[NBD_SOCK_FD])
+                               continue;
+                       fd = (int)nla_get_u32(socks[NBD_SOCK_FD]);
+                       ret = nbd_add_socket(nbd, fd, true);
+                       if (ret)
+                               goto out;
+               }
+       }
+       ret = nbd_start_device(nbd);
+out:
+       mutex_unlock(&nbd->config_lock);
+       if (!ret) {
+               set_bit(NBD_HAS_CONFIG_REF, &config->runtime_flags);
+               refcount_inc(&nbd->config_refs);
+               nbd_connect_reply(info, nbd->index);
+       }
+       nbd_config_put(nbd);
+       if (put_dev)
+               nbd_put(nbd);
+       return ret;
+}
+
+static int nbd_genl_disconnect(struct sk_buff *skb, struct genl_info *info)
+{
+       struct nbd_device *nbd;
+       int index;
+
+       if (!netlink_capable(skb, CAP_SYS_ADMIN))
+               return -EPERM;
+
+       if (!info->attrs[NBD_ATTR_INDEX]) {
+               printk(KERN_ERR "nbd: must specify an index to disconnect\n");
+               return -EINVAL;
+       }
+       index = nla_get_u32(info->attrs[NBD_ATTR_INDEX]);
+       mutex_lock(&nbd_index_mutex);
+       nbd = idr_find(&nbd_index_idr, index);
+       if (!nbd) {
+               mutex_unlock(&nbd_index_mutex);
+               printk(KERN_ERR "nbd: couldn't find device at index %d\n",
+                      index);
+               return -EINVAL;
+       }
+       if (!refcount_inc_not_zero(&nbd->refs)) {
+               mutex_unlock(&nbd_index_mutex);
+               printk(KERN_ERR "nbd: device at index %d is going down\n",
+                      index);
+               return -EINVAL;
+       }
+       mutex_unlock(&nbd_index_mutex);
+       if (!refcount_inc_not_zero(&nbd->config_refs)) {
+               nbd_put(nbd);
+               return 0;
+       }
+       mutex_lock(&nbd->config_lock);
+       nbd_disconnect(nbd);
+       mutex_unlock(&nbd->config_lock);
+       if (test_and_clear_bit(NBD_HAS_CONFIG_REF,
+                              &nbd->config->runtime_flags))
+               nbd_config_put(nbd);
+       nbd_config_put(nbd);
+       nbd_put(nbd);
+       return 0;
+}
+
+static int nbd_genl_reconfigure(struct sk_buff *skb, struct genl_info *info)
+{
+       struct nbd_device *nbd = NULL;
+       struct nbd_config *config;
+       int index;
+       int ret = -EINVAL;
+       bool put_dev = false;
+
+       if (!netlink_capable(skb, CAP_SYS_ADMIN))
+               return -EPERM;
+
+       if (!info->attrs[NBD_ATTR_INDEX]) {
+               printk(KERN_ERR "nbd: must specify a device to reconfigure\n");
+               return -EINVAL;
+       }
+       index = nla_get_u32(info->attrs[NBD_ATTR_INDEX]);
+       mutex_lock(&nbd_index_mutex);
+       nbd = idr_find(&nbd_index_idr, index);
+       if (!nbd) {
+               mutex_unlock(&nbd_index_mutex);
+               printk(KERN_ERR "nbd: couldn't find a device at index %d\n",
+                      index);
+               return -EINVAL;
+       }
+       if (!refcount_inc_not_zero(&nbd->refs)) {
+               mutex_unlock(&nbd_index_mutex);
+               printk(KERN_ERR "nbd: device at index %d is going down\n",
+                      index);
+               return -EINVAL;
+       }
+       mutex_unlock(&nbd_index_mutex);
+
+       if (!refcount_inc_not_zero(&nbd->config_refs)) {
+               dev_err(nbd_to_dev(nbd),
+                       "not configured, cannot reconfigure\n");
+               nbd_put(nbd);
+               return -EINVAL;
+       }
+
+       mutex_lock(&nbd->config_lock);
+       config = nbd->config;
+       if (!test_bit(NBD_BOUND, &config->runtime_flags) ||
+           !nbd->task_recv) {
+               dev_err(nbd_to_dev(nbd),
+                       "not configured, cannot reconfigure\n");
+               goto out;
+       }
+
+       if (info->attrs[NBD_ATTR_TIMEOUT]) {
+               u64 timeout = nla_get_u64(info->attrs[NBD_ATTR_TIMEOUT]);
+               nbd->tag_set.timeout = timeout * HZ;
+               blk_queue_rq_timeout(nbd->disk->queue, timeout * HZ);
+       }
+       if (info->attrs[NBD_ATTR_DEAD_CONN_TIMEOUT]) {
+               config->dead_conn_timeout =
+                       nla_get_u64(info->attrs[NBD_ATTR_DEAD_CONN_TIMEOUT]);
+               config->dead_conn_timeout *= HZ;
+       }
+       if (info->attrs[NBD_ATTR_CLIENT_FLAGS]) {
+               u64 flags = nla_get_u64(info->attrs[NBD_ATTR_CLIENT_FLAGS]);
+               if (flags & NBD_CFLAG_DESTROY_ON_DISCONNECT) {
+                       if (!test_and_set_bit(NBD_DESTROY_ON_DISCONNECT,
+                                             &config->runtime_flags))
+                               put_dev = true;
+               } else {
+                       if (test_and_clear_bit(NBD_DESTROY_ON_DISCONNECT,
+                                              &config->runtime_flags))
+                               refcount_inc(&nbd->refs);
+               }
+       }
+
+       if (info->attrs[NBD_ATTR_SOCKETS]) {
+               struct nlattr *attr;
+               int rem, fd;
+
+               nla_for_each_nested(attr, info->attrs[NBD_ATTR_SOCKETS],
+                                   rem) {
+                       struct nlattr *socks[NBD_SOCK_MAX+1];
+
+                       if (nla_type(attr) != NBD_SOCK_ITEM) {
+                               printk(KERN_ERR "nbd: socks must be embedded in a SOCK_ITEM attr\n");
+                               ret = -EINVAL;
+                               goto out;
+                       }
+                       ret = nla_parse_nested(socks, NBD_SOCK_MAX, attr,
+                                              nbd_sock_policy, info->extack);
+                       if (ret != 0) {
+                               printk(KERN_ERR "nbd: error processing sock list\n");
+                               ret = -EINVAL;
+                               goto out;
+                       }
+                       if (!socks[NBD_SOCK_FD])
+                               continue;
+                       fd = (int)nla_get_u32(socks[NBD_SOCK_FD]);
+                       ret = nbd_reconnect_socket(nbd, fd);
+                       if (ret) {
+                               if (ret == -ENOSPC)
+                                       ret = 0;
+                               goto out;
+                       }
+                       dev_info(nbd_to_dev(nbd), "reconnected socket\n");
+               }
+       }
+out:
+       mutex_unlock(&nbd->config_lock);
+       nbd_config_put(nbd);
+       nbd_put(nbd);
+       if (put_dev)
+               nbd_put(nbd);
+       return ret;
+}
+
+static const struct genl_ops nbd_connect_genl_ops[] = {
+       {
+               .cmd    = NBD_CMD_CONNECT,
+               .policy = nbd_attr_policy,
+               .doit   = nbd_genl_connect,
+       },
+       {
+               .cmd    = NBD_CMD_DISCONNECT,
+               .policy = nbd_attr_policy,
+               .doit   = nbd_genl_disconnect,
+       },
+       {
+               .cmd    = NBD_CMD_RECONFIGURE,
+               .policy = nbd_attr_policy,
+               .doit   = nbd_genl_reconfigure,
+       },
+       {
+               .cmd    = NBD_CMD_STATUS,
+               .policy = nbd_attr_policy,
+               .doit   = nbd_genl_status,
+       },
+};
+
+static const struct genl_multicast_group nbd_mcast_grps[] = {
+       { .name = NBD_GENL_MCAST_GROUP_NAME, },
+};
+
+static struct genl_family nbd_genl_family __ro_after_init = {
+       .hdrsize        = 0,
+       .name           = NBD_GENL_FAMILY_NAME,
+       .version        = NBD_GENL_VERSION,
+       .module         = THIS_MODULE,
+       .ops            = nbd_connect_genl_ops,
+       .n_ops          = ARRAY_SIZE(nbd_connect_genl_ops),
+       .maxattr        = NBD_ATTR_MAX,
+       .mcgrps         = nbd_mcast_grps,
+       .n_mcgrps       = ARRAY_SIZE(nbd_mcast_grps),
+};
+
+static int populate_nbd_status(struct nbd_device *nbd, struct sk_buff *reply)
+{
+       struct nlattr *dev_opt;
+       u8 connected = 0;
+       int ret;
+
+       /* This is a little racey, but for status it's ok.  The
+        * reason we don't take a ref here is because we can't
+        * take a ref in the index == -1 case as we would need
+        * to put under the nbd_index_mutex, which could
+        * deadlock if we are configured to remove ourselves
+        * once we're disconnected.
+        */
+       if (refcount_read(&nbd->config_refs))
+               connected = 1;
+       dev_opt = nla_nest_start(reply, NBD_DEVICE_ITEM);
+       if (!dev_opt)
+               return -EMSGSIZE;
+       ret = nla_put_u32(reply, NBD_DEVICE_INDEX, nbd->index);
+       if (ret)
+               return -EMSGSIZE;
+       ret = nla_put_u8(reply, NBD_DEVICE_CONNECTED,
+                        connected);
+       if (ret)
+               return -EMSGSIZE;
+       nla_nest_end(reply, dev_opt);
+       return 0;
+}
+
+static int status_cb(int id, void *ptr, void *data)
+{
+       struct nbd_device *nbd = ptr;
+       return populate_nbd_status(nbd, (struct sk_buff *)data);
+}
+
+static int nbd_genl_status(struct sk_buff *skb, struct genl_info *info)
+{
+       struct nlattr *dev_list;
+       struct sk_buff *reply;
+       void *reply_head;
+       size_t msg_size;
+       int index = -1;
+       int ret = -ENOMEM;
+
+       if (info->attrs[NBD_ATTR_INDEX])
+               index = nla_get_u32(info->attrs[NBD_ATTR_INDEX]);
+
+       mutex_lock(&nbd_index_mutex);
+
+       msg_size = nla_total_size(nla_attr_size(sizeof(u32)) +
+                                 nla_attr_size(sizeof(u8)));
+       msg_size *= (index == -1) ? nbd_total_devices : 1;
+
+       reply = genlmsg_new(msg_size, GFP_KERNEL);
+       if (!reply)
+               goto out;
+       reply_head = genlmsg_put_reply(reply, info, &nbd_genl_family, 0,
+                                      NBD_CMD_STATUS);
+       if (!reply_head) {
+               nlmsg_free(reply);
+               goto out;
+       }
+
+       dev_list = nla_nest_start(reply, NBD_ATTR_DEVICE_LIST);
+       if (index == -1) {
+               ret = idr_for_each(&nbd_index_idr, &status_cb, reply);
+               if (ret) {
+                       nlmsg_free(reply);
+                       goto out;
+               }
+       } else {
+               struct nbd_device *nbd;
+               nbd = idr_find(&nbd_index_idr, index);
+               if (nbd) {
+                       ret = populate_nbd_status(nbd, reply);
+                       if (ret) {
+                               nlmsg_free(reply);
+                               goto out;
+                       }
+               }
+       }
+       nla_nest_end(reply, dev_list);
+       genlmsg_end(reply, reply_head);
+       genlmsg_reply(reply, info);
+       ret = 0;
+out:
+       mutex_unlock(&nbd_index_mutex);
+       return ret;
+}
+
+static void nbd_connect_reply(struct genl_info *info, int index)
+{
+       struct sk_buff *skb;
+       void *msg_head;
+       int ret;
+
+       skb = genlmsg_new(nla_total_size(sizeof(u32)), GFP_KERNEL);
+       if (!skb)
+               return;
+       msg_head = genlmsg_put_reply(skb, info, &nbd_genl_family, 0,
+                                    NBD_CMD_CONNECT);
+       if (!msg_head) {
+               nlmsg_free(skb);
+               return;
+       }
+       ret = nla_put_u32(skb, NBD_ATTR_INDEX, index);
+       if (ret) {
+               nlmsg_free(skb);
+               return;
+       }
+       genlmsg_end(skb, msg_head);
+       genlmsg_reply(skb, info);
+}
+
+static void nbd_mcast_index(int index)
+{
+       struct sk_buff *skb;
+       void *msg_head;
+       int ret;
+
+       skb = genlmsg_new(nla_total_size(sizeof(u32)), GFP_KERNEL);
+       if (!skb)
+               return;
+       msg_head = genlmsg_put(skb, 0, 0, &nbd_genl_family, 0,
+                                    NBD_CMD_LINK_DEAD);
+       if (!msg_head) {
+               nlmsg_free(skb);
+               return;
+       }
+       ret = nla_put_u32(skb, NBD_ATTR_INDEX, index);
+       if (ret) {
+               nlmsg_free(skb);
+               return;
+       }
+       genlmsg_end(skb, msg_head);
+       genlmsg_multicast(&nbd_genl_family, skb, 0, 0, GFP_KERNEL);
+}
+
+static void nbd_dead_link_work(struct work_struct *work)
+{
+       struct link_dead_args *args = container_of(work, struct link_dead_args,
+                                                  work);
+       nbd_mcast_index(args->index);
+       kfree(args);
+}
 
 static int __init nbd_init(void)
 {
@@ -1184,6 +2071,11 @@ static int __init nbd_init(void)
                return -EIO;
        }
 
+       if (genl_register_family(&nbd_genl_family)) {
+               unregister_blkdev(NBD_MAJOR, "nbd");
+               destroy_workqueue(recv_workqueue);
+               return -EINVAL;
+       }
        nbd_dbg_init();
 
        mutex_lock(&nbd_index_mutex);
@@ -1195,17 +2087,34 @@ static int __init nbd_init(void)
 
 static int nbd_exit_cb(int id, void *ptr, void *data)
 {
+       struct list_head *list = (struct list_head *)data;
        struct nbd_device *nbd = ptr;
-       nbd_dev_remove(nbd);
+
+       list_add_tail(&nbd->list, list);
        return 0;
 }
 
 static void __exit nbd_cleanup(void)
 {
+       struct nbd_device *nbd;
+       LIST_HEAD(del_list);
+
        nbd_dbg_close();
 
-       idr_for_each(&nbd_index_idr, &nbd_exit_cb, NULL);
+       mutex_lock(&nbd_index_mutex);
+       idr_for_each(&nbd_index_idr, &nbd_exit_cb, &del_list);
+       mutex_unlock(&nbd_index_mutex);
+
+       while (!list_empty(&del_list)) {
+               nbd = list_first_entry(&del_list, struct nbd_device, list);
+               list_del_init(&nbd->list);
+               if (refcount_read(&nbd->refs) != 1)
+                       printk(KERN_ERR "nbd: possibly leaking a device\n");
+               nbd_put(nbd);
+       }
+
        idr_destroy(&nbd_index_idr);
+       genl_unregister_family(&nbd_genl_family);
        destroy_workqueue(recv_workqueue);
        unregister_blkdev(NBD_MAJOR, "nbd");
 }