From: Shachar Sharon Date: Tue, 1 Oct 2024 09:09:40 +0000 (+0300) Subject: vfs_ceph_new: use libcephfs nonblocking API for async-io ops X-Git-Tag: tevent-0.17.0~609 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4ae9224138449fe7b8dd1e8ce8141aedd014efc4;p=thirdparty%2Fsamba.git vfs_ceph_new: use libcephfs nonblocking API for async-io ops Use libcephfs non-blocking API (ceph_ll_nonblocking_readv_writev[1]) in combination with smb VFS async hooks ({pread,pwrite,fsync}_send/_recv). Fills libcephfs' struct ceph_ll_io_info with single iovec and submit/complete the operation asynchronously on libcephfs side, with corresponding tevent schedule-immediate upon completion on smbd side. Control nonblocking/normal I/O mode via config parameter. The common parts of async I/O (with/without HAVE_CEPH_ASYNCIO) are united. Specifically, use same struct vfs_ceph_aio_state and common code via helper function for all async I/O hooks. When HAVE_CEPH_ASYNCIO is True _and_ config option 'asyncio = yes' use libcephfs asynchronous I/O API. Otherwise, fake async operation using normal blocking APIs. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15810 [1] https://github.com/ceph/ceph/commit/b4e39f3eccd6734f1ed13c700c136e3aef1777f8 Signed-off-by: Shachar Sharon Reviewed-by: Ralph Boehme Reviewed-by: Volker Lendecke Reviewed-by: Guenther Deschner Autobuild-User(master): Günther Deschner Autobuild-Date(master): Tue Mar 4 16:53:21 UTC 2025 on atb-devel-224 --- diff --git a/source3/modules/vfs_ceph_new.c b/source3/modules/vfs_ceph_new.c index 8d1f6f7fbec..bc3f6da2205 100644 --- a/source3/modules/vfs_ceph_new.c +++ b/source3/modules/vfs_ceph_new.c @@ -102,6 +102,9 @@ static const struct enum_list enum_vfs_cephfs_proxy_vals[] = { #define CEPH_FN(_name) typeof(_name) *_name ## _fn struct vfs_ceph_config { +#if HAVE_CEPH_ASYNCIO + struct tevent_threaded_context *tctx; +#endif const char *conf_file; const char *user_id; const char *fsname; @@ -157,6 +160,9 @@ struct vfs_ceph_config { CEPH_FN(ceph_version); CEPH_FN(ceph_rewinddir); CEPH_FN(ceph_readdir_r); +#if HAVE_CEPH_ASYNCIO + CEPH_FN(ceph_ll_nonblocking_readv_writev); +#endif }; /* @@ -443,6 +449,9 @@ static bool vfs_cephfs_load_lib(struct vfs_ceph_config *config) CHECK_CEPH_FN(libhandle, ceph_version); CHECK_CEPH_FN(libhandle, ceph_rewinddir); CHECK_CEPH_FN(libhandle, ceph_readdir_r); +#if HAVE_CEPH_ASYNCIO + CHECK_CEPH_FN(libhandle, ceph_ll_nonblocking_readv_writev); +#endif config->libhandle = libhandle; @@ -1795,6 +1804,30 @@ static int vfs_ceph_ll_fremovexattr(const struct vfs_handle_struct *handle, cfh->uperm); } +#if HAVE_CEPH_ASYNCIO +static int64_t vfs_ceph_ll_nonblocking_readv_writev( + const struct vfs_handle_struct *handle, + const struct vfs_ceph_fh *cfh, + struct ceph_ll_io_info *io_info) +{ + struct vfs_ceph_config *config = NULL; + + SMB_VFS_HANDLE_GET_DATA(handle, + config, + struct vfs_ceph_config, + return -EINVAL); + + DBG_DEBUG("[CEPH] ceph_ll_nonblocking_readv_writev: ino=%" PRIu64 + " fd=%d off=%jd\n", + cfh->iref.ino, + cfh->fd, + io_info->off); + + return config->ceph_ll_nonblocking_readv_writev_fn(config->mount, + io_info); +} +#endif + /* Ceph Inode-refernce get/put wrappers */ static int vfs_ceph_iget(const struct vfs_handle_struct *handle, uint64_t ino, @@ -2272,17 +2305,28 @@ out: } struct vfs_ceph_aio_state { + struct vfs_ceph_config *config; + struct vfs_ceph_fh *cfh; +#if HAVE_CEPH_ASYNCIO + struct tevent_req *req; + bool orphaned; + struct tevent_immediate *im; + void *data; + size_t len; + off_t off; + bool write; + bool fsync; + + struct ceph_ll_io_info io_info; + struct iovec iov; +#endif struct timespec start_time; struct timespec finish_time; + ssize_t result; struct vfs_aio_state vfs_aio_state; SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes); }; -struct vfs_ceph_pread_state { - ssize_t bytes_read; - struct vfs_ceph_aio_state ceph_aio_state; -}; - static void vfs_ceph_aio_start(struct vfs_ceph_aio_state *state) { SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes); @@ -2298,22 +2342,218 @@ static void vfs_ceph_aio_finish(struct vfs_ceph_aio_state *state, if (result < 0) { state->vfs_aio_state.error = (int)result; } + + state->result = result; SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); } -/* - * Fake up an async ceph read by calling the synchronous API. - */ +#if HAVE_CEPH_ASYNCIO + +static void vfs_ceph_aio_done(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_data); + +static int vfs_ceph_require_tctx(struct vfs_ceph_aio_state *state, + struct tevent_context *ev) +{ + struct vfs_ceph_config *config = state->config; + + if (config->tctx != NULL) { + return 0; + } + + config->tctx = tevent_threaded_context_create(config, ev); + if (config->tctx == NULL) { + return -ENOMEM; + } + + return 0; +} + +static void vfs_ceph_aio_complete(struct ceph_ll_io_info *io_info) +{ + struct vfs_ceph_aio_state *state = io_info->priv; + + if (state->orphaned) { + return; + } + + DBG_DEBUG("[CEPH] aio_complete: ino=%" PRIu64 + " fd=%d off=%jd len=%ju result=%jd\n", + state->cfh->iref.ino, + state->cfh->fd, + state->off, + state->len, + state->io_info.result); + + tevent_threaded_schedule_immediate(state->config->tctx, + state->im, + vfs_ceph_aio_done, + state->req); +} + +static void vfs_ceph_aio_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); + + if (req_state == TEVENT_REQ_IN_PROGRESS) { + /* + * The job thread is still running, we need to protect the + * memory used by the job completion function. + */ + (void)talloc_reparent(req, NULL, state); + state->orphaned = true; + } +} + +static void vfs_ceph_aio_submit(struct vfs_handle_struct *handle, + struct tevent_req *req, + struct tevent_context *ev) +{ + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); + int64_t res; + + DBG_DEBUG("[CEPH] aio_send: ino=%" PRIu64 "fd=%d off=%jd len=%ju\n", + state->cfh->iref.ino, + state->cfh->fd, + state->off, + state->len); + + state->io_info.callback = vfs_ceph_aio_complete; + state->iov.iov_base = state->data; + state->iov.iov_len = state->len; + state->io_info.priv = state; + state->io_info.fh = state->cfh->fh; + state->io_info.iov = &state->iov; + state->io_info.iovcnt = 1; + state->io_info.off = state->off; + state->io_info.write = state->write; + state->io_info.fsync = state->fsync; + state->io_info.result = 0; + + vfs_ceph_aio_start(state); + + res = vfs_ceph_ll_nonblocking_readv_writev(handle, + state->cfh, + &state->io_info); + if (res < 0) { + state->result = (int)res; + tevent_req_error(req, -((int)res)); + tevent_req_post(req, ev); + return; + } + + tevent_req_set_cleanup_fn(req, vfs_ceph_aio_cleanup); + return; +} + +static void vfs_ceph_aio_done(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_data) +{ + struct tevent_req *req = private_data; + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); + + DBG_DEBUG("[CEPH] aio_done: ino=%" PRIu64 + " fd=%d off=%jd len=%ju result=%jd\n", + state->cfh->iref.ino, + state->cfh->fd, + state->off, + state->len, + state->io_info.result); + + vfs_ceph_aio_finish(state, state->io_info.result); + if (state->result < 0) { + tevent_req_error(req, -((int)state->result)); + return; + } + + tevent_req_done(req); +} + +static ssize_t vfs_ceph_aio_recv(struct tevent_req *req, + struct vfs_aio_state *vfs_aio_state) +{ + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); + ssize_t res = -1; + + DBG_DEBUG("[CEPH] aio_recv: ino=%" PRIu64 + " fd=%d off=%jd len=%ju result=%ld\n", + state->cfh->iref.ino, + state->cfh->fd, + state->off, + state->len, + state->result); + + if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { + goto out; + } + + *vfs_aio_state = state->vfs_aio_state; + res = state->result; +out: + tevent_req_received(req); + return res; +} + +#endif /* HAVE_CEPH_ASYNCIO */ + +static void vfs_ceph_aio_prepare(struct vfs_handle_struct *handle, + struct tevent_req *req, + struct tevent_context *ev, + struct files_struct *fsp) +{ + struct vfs_ceph_config *config = NULL; + struct vfs_ceph_aio_state *state = NULL; + int ret = -1; + + SMB_VFS_HANDLE_GET_DATA(handle, + config, + struct vfs_ceph_config, + (void)0); + if (config == NULL) { + tevent_req_error(req, EINVAL); + return; + } + + state = tevent_req_data(req, struct vfs_ceph_aio_state); + state->config = config; + +#if HAVE_CEPH_ASYNCIO + ret = vfs_ceph_require_tctx(state, ev); + if (ret != 0) { + tevent_req_error(req, -ret); + return; + } + + state->im = tevent_create_immediate(state); + if (state->im == NULL) { + tevent_req_error(req, ENOMEM); + return; + } +#endif + + ret = vfs_ceph_fetch_io_fh(handle, fsp, &state->cfh); + if (ret != 0) { + tevent_req_error(req, -ret); + } +} + static struct tevent_req *vfs_ceph_pread_send(struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct files_struct *fsp, void *data, - size_t n, off_t offset) + size_t n, + off_t offset) { - struct vfs_ceph_fh *cfh = NULL; struct tevent_req *req = NULL; - struct vfs_ceph_pread_state *state = NULL; + struct vfs_ceph_aio_state *state = NULL; int ret = -1; DBG_DEBUG("[CEPH] pread_send(%p, %p, %p, %zu, %zd)\n", @@ -2322,33 +2562,40 @@ static struct tevent_req *vfs_ceph_pread_send(struct vfs_handle_struct *handle, data, n, offset); - req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_pread_state); + + req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_aio_state); if (req == NULL) { return NULL; } - ret = vfs_ceph_fetch_io_fh(handle, fsp, &cfh); - if (ret != 0) { - tevent_req_error(req, -ret); + vfs_ceph_aio_prepare(handle, req, ev, fsp); + if (!tevent_req_is_in_progress(req)) { return tevent_req_post(req, ev); } SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p, - state->ceph_aio_state.profile_bytes, + state->profile_bytes, n); - SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ceph_aio_state.profile_bytes); + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); - vfs_ceph_aio_start(&state->ceph_aio_state); - ret = vfs_ceph_ll_read(handle, cfh, offset, n, data); - vfs_ceph_aio_finish(&state->ceph_aio_state, ret); +#if HAVE_CEPH_ASYNCIO + state->req = req; + state->data = data; + state->len = n; + state->off = offset; + vfs_ceph_aio_submit(handle, req, ev); + return req; +#endif + vfs_ceph_aio_start(state); + ret = vfs_ceph_ll_read(handle, state->cfh, offset, n, data); + vfs_ceph_aio_finish(state, ret); if (ret < 0) { /* ceph returns -errno on error. */ tevent_req_error(req, -ret); return tevent_req_post(req, ev); } - state->bytes_read = ret; tevent_req_done(req); /* Return and schedule the completion of the call. */ return tevent_req_post(req, ev); @@ -2357,18 +2604,22 @@ static struct tevent_req *vfs_ceph_pread_send(struct vfs_handle_struct *handle, static ssize_t vfs_ceph_pread_recv(struct tevent_req *req, struct vfs_aio_state *vfs_aio_state) { - struct vfs_ceph_pread_state *state = - tevent_req_data(req, struct vfs_ceph_pread_state); + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); - DBG_DEBUG("[CEPH] pread_recv: bytes_read=%zd\n", state->bytes_read); + DBG_DEBUG("[CEPH] pread_recv: bytes_read=%zd\n", state->result); - SMBPROFILE_BYTES_ASYNC_END(state->ceph_aio_state.profile_bytes); + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); +#if HAVE_CEPH_ASYNCIO + return vfs_ceph_aio_recv(req, vfs_aio_state); +#endif if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { return -1; } - *vfs_aio_state = state->ceph_aio_state.vfs_aio_state; - return state->bytes_read; + + *vfs_aio_state = state->vfs_aio_state; + return state->result; } static ssize_t vfs_ceph_pwrite(struct vfs_handle_struct *handle, @@ -2399,24 +2650,16 @@ out: return lstatus_code(result); } -struct vfs_ceph_pwrite_state { - ssize_t bytes_written; - struct vfs_ceph_aio_state ceph_aio_state; -}; - -/* - * Fake up an async ceph write by calling the synchronous API. - */ static struct tevent_req *vfs_ceph_pwrite_send(struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct files_struct *fsp, const void *data, - size_t n, off_t offset) + size_t n, + off_t offset) { - struct vfs_ceph_fh *cfh = NULL; struct tevent_req *req = NULL; - struct vfs_ceph_pwrite_state *state = NULL; + struct vfs_ceph_aio_state *state = NULL; int ret = -1; DBG_DEBUG("[CEPH] pwrite_send(%p, %p, %p, %zu, %zd)\n", @@ -2425,33 +2668,42 @@ static struct tevent_req *vfs_ceph_pwrite_send(struct vfs_handle_struct *handle, data, n, offset); - req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_pwrite_state); + + req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_aio_state); if (req == NULL) { return NULL; } - ret = vfs_ceph_fetch_io_fh(handle, fsp, &cfh); - if (ret != 0) { - tevent_req_error(req, -ret); + vfs_ceph_aio_prepare(handle, req, ev, fsp); + if (!tevent_req_is_in_progress(req)) { return tevent_req_post(req, ev); } SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p, - state->ceph_aio_state.profile_bytes, + state->profile_bytes, n); - SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ceph_aio_state.profile_bytes); + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); + +#if HAVE_CEPH_ASYNCIO + state->req = req; + state->data = discard_const(data); + state->len = n; + state->off = offset; + state->write = true; + vfs_ceph_aio_submit(handle, req, ev); + return req; +#endif - vfs_ceph_aio_start(&state->ceph_aio_state); - ret = vfs_ceph_ll_write(handle, cfh, offset, n, data); - vfs_ceph_aio_finish(&state->ceph_aio_state, ret); + vfs_ceph_aio_start(state); + ret = vfs_ceph_ll_write(handle, state->cfh, offset, n, data); + vfs_ceph_aio_finish(state, ret); if (ret < 0) { /* ceph returns -errno on error. */ tevent_req_error(req, -ret); return tevent_req_post(req, ev); } - state->bytes_written = ret; tevent_req_done(req); /* Return and schedule the completion of the call. */ return tevent_req_post(req, ev); @@ -2460,19 +2712,23 @@ static struct tevent_req *vfs_ceph_pwrite_send(struct vfs_handle_struct *handle, static ssize_t vfs_ceph_pwrite_recv(struct tevent_req *req, struct vfs_aio_state *vfs_aio_state) { - struct vfs_ceph_pwrite_state *state = - tevent_req_data(req, struct vfs_ceph_pwrite_state); + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); + + DBG_DEBUG("[CEPH] pwrite_recv: bytes_written=%zd\n", state->result); - DBG_DEBUG("[CEPH] pwrite_recv: bytes_written=%zd\n", - state->bytes_written); + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); - SMBPROFILE_BYTES_ASYNC_END(state->ceph_aio_state.profile_bytes); +#if HAVE_CEPH_ASYNCIO + return vfs_ceph_aio_recv(req, vfs_aio_state); +#endif if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { return -1; } - *vfs_aio_state = state->ceph_aio_state.vfs_aio_state; - return state->bytes_written; + + *vfs_aio_state = state->vfs_aio_state; + return state->result; } static off_t vfs_ceph_lseek(struct vfs_handle_struct *handle, @@ -2592,47 +2848,46 @@ out: return status_code(result); } -/* - * Fake up an async ceph fsync by calling the synchronous API. - */ - -struct vfs_ceph_fsync_state { - struct vfs_ceph_aio_state ceph_aio_state; -}; - static struct tevent_req *vfs_ceph_fsync_send(struct vfs_handle_struct *handle, - TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - files_struct *fsp) + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + files_struct *fsp) { - struct vfs_ceph_fh *cfh = NULL; struct tevent_req *req = NULL; - struct vfs_ceph_fsync_state *state = NULL; + struct vfs_ceph_aio_state *state = NULL; int ret = -1; DBG_DEBUG("[CEPH] fsync_send(%p, %p)\n", handle, fsp); - req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_fsync_state); + req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_aio_state); if (req == NULL) { return NULL; } - ret = vfs_ceph_fetch_io_fh(handle, fsp, &cfh); - if (ret != 0) { - tevent_req_error(req, -ret); + vfs_ceph_aio_prepare(handle, req, ev, fsp); + if (!tevent_req_is_in_progress(req)) { return tevent_req_post(req, ev); } SMBPROFILE_BYTES_ASYNC_START(syscall_asys_fsync, profile_p, - state->ceph_aio_state.profile_bytes, + state->profile_bytes, 0); - SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ceph_aio_state.profile_bytes); + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes); + +#if HAVE_CEPH_ASYNCIO + state->req = req; + state->data = NULL; + state->len = 0; + state->off = 0; + state->fsync = true; + vfs_ceph_aio_submit(handle, req, ev); + return req; +#endif - /* Make sync call. */ - vfs_ceph_aio_start(&state->ceph_aio_state); - ret = vfs_ceph_ll_fsync(handle, cfh, false); - vfs_ceph_aio_finish(&state->ceph_aio_state, ret); + vfs_ceph_aio_start(state); + ret = vfs_ceph_ll_fsync(handle, state->cfh, false); + vfs_ceph_aio_finish(state, ret); if (ret != 0) { /* ceph_fsync returns -errno on error. */ tevent_req_error(req, -ret); @@ -2646,21 +2901,26 @@ static struct tevent_req *vfs_ceph_fsync_send(struct vfs_handle_struct *handle, } static int vfs_ceph_fsync_recv(struct tevent_req *req, - struct vfs_aio_state *vfs_aio_state) + struct vfs_aio_state *vfs_aio_state) { - struct vfs_ceph_fsync_state *state = tevent_req_data( - req, struct vfs_ceph_fsync_state); + struct vfs_ceph_aio_state *state = tevent_req_data( + req, struct vfs_ceph_aio_state); DBG_DEBUG("[CEPH] fsync_recv: error=%d duration=%" PRIu64 "\n", - state->ceph_aio_state.vfs_aio_state.error, - state->ceph_aio_state.vfs_aio_state.duration); + state->vfs_aio_state.error, + state->vfs_aio_state.duration); + + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes); - SMBPROFILE_BYTES_ASYNC_END(state->ceph_aio_state.profile_bytes); +#if HAVE_CEPH_ASYNCIO + return vfs_ceph_aio_recv(req, vfs_aio_state); +#endif if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) { return -1; } - *vfs_aio_state = state->ceph_aio_state.vfs_aio_state; + + *vfs_aio_state = state->vfs_aio_state; return 0; } diff --git a/source3/wscript b/source3/wscript index 0a2b9234e49..f3c819a77d4 100644 --- a/source3/wscript +++ b/source3/wscript @@ -1637,6 +1637,10 @@ int main(void) { headers='cephfs/libcephfs.h')): conf.DEFINE('HAVE_CEPH', '1') + if (conf.CHECK_FUNCS_IN('ceph_ll_nonblocking_readv_writev', 'cephfs', + headers='cephfs/libcephfs.h')): + conf.DEFINE('HAVE_CEPH_ASYNCIO', '1') + if conf.CONFIG_SET('HAVE_CEPH'): Logs.info("building ceph vfs modules") else: