#define CEPH_FN(_name) typeof(_name) *_name ## _fn
struct vfs_ceph_config {
+#if HAVE_CEPH_ASYNCIO
+ struct tevent_threaded_context *tctx;
+#endif
const char *conf_file;
const char *user_id;
const char *fsname;
CEPH_FN(ceph_version);
CEPH_FN(ceph_rewinddir);
CEPH_FN(ceph_readdir_r);
+#if HAVE_CEPH_ASYNCIO
+ CEPH_FN(ceph_ll_nonblocking_readv_writev);
+#endif
};
/*
CHECK_CEPH_FN(libhandle, ceph_version);
CHECK_CEPH_FN(libhandle, ceph_rewinddir);
CHECK_CEPH_FN(libhandle, ceph_readdir_r);
+#if HAVE_CEPH_ASYNCIO
+ CHECK_CEPH_FN(libhandle, ceph_ll_nonblocking_readv_writev);
+#endif
config->libhandle = libhandle;
cfh->uperm);
}
+#if HAVE_CEPH_ASYNCIO
+static int64_t vfs_ceph_ll_nonblocking_readv_writev(
+ const struct vfs_handle_struct *handle,
+ const struct vfs_ceph_fh *cfh,
+ struct ceph_ll_io_info *io_info)
+{
+ struct vfs_ceph_config *config = NULL;
+
+ SMB_VFS_HANDLE_GET_DATA(handle,
+ config,
+ struct vfs_ceph_config,
+ return -EINVAL);
+
+ DBG_DEBUG("[CEPH] ceph_ll_nonblocking_readv_writev: ino=%" PRIu64
+ " fd=%d off=%jd\n",
+ cfh->iref.ino,
+ cfh->fd,
+ io_info->off);
+
+ return config->ceph_ll_nonblocking_readv_writev_fn(config->mount,
+ io_info);
+}
+#endif
+
/* Ceph Inode-refernce get/put wrappers */
static int vfs_ceph_iget(const struct vfs_handle_struct *handle,
uint64_t ino,
}
struct vfs_ceph_aio_state {
+ struct vfs_ceph_config *config;
+ struct vfs_ceph_fh *cfh;
+#if HAVE_CEPH_ASYNCIO
+ struct tevent_req *req;
+ bool orphaned;
+ struct tevent_immediate *im;
+ void *data;
+ size_t len;
+ off_t off;
+ bool write;
+ bool fsync;
+
+ struct ceph_ll_io_info io_info;
+ struct iovec iov;
+#endif
struct timespec start_time;
struct timespec finish_time;
+ ssize_t result;
struct vfs_aio_state vfs_aio_state;
SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
};
-struct vfs_ceph_pread_state {
- ssize_t bytes_read;
- struct vfs_ceph_aio_state ceph_aio_state;
-};
-
static void vfs_ceph_aio_start(struct vfs_ceph_aio_state *state)
{
SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
if (result < 0) {
state->vfs_aio_state.error = (int)result;
}
+
+ state->result = result;
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
}
-/*
- * Fake up an async ceph read by calling the synchronous API.
- */
+#if HAVE_CEPH_ASYNCIO
+
+static void vfs_ceph_aio_done(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data);
+
+static int vfs_ceph_require_tctx(struct vfs_ceph_aio_state *state,
+ struct tevent_context *ev)
+{
+ struct vfs_ceph_config *config = state->config;
+
+ if (config->tctx != NULL) {
+ return 0;
+ }
+
+ config->tctx = tevent_threaded_context_create(config, ev);
+ if (config->tctx == NULL) {
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+static void vfs_ceph_aio_complete(struct ceph_ll_io_info *io_info)
+{
+ struct vfs_ceph_aio_state *state = io_info->priv;
+
+ if (state->orphaned) {
+ return;
+ }
+
+ DBG_DEBUG("[CEPH] aio_complete: ino=%" PRIu64
+ " fd=%d off=%jd len=%ju result=%jd\n",
+ state->cfh->iref.ino,
+ state->cfh->fd,
+ state->off,
+ state->len,
+ state->io_info.result);
+
+ tevent_threaded_schedule_immediate(state->config->tctx,
+ state->im,
+ vfs_ceph_aio_done,
+ state->req);
+}
+
+static void vfs_ceph_aio_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
+
+ if (req_state == TEVENT_REQ_IN_PROGRESS) {
+ /*
+ * The job thread is still running, we need to protect the
+ * memory used by the job completion function.
+ */
+ (void)talloc_reparent(req, NULL, state);
+ state->orphaned = true;
+ }
+}
+
+static void vfs_ceph_aio_submit(struct vfs_handle_struct *handle,
+ struct tevent_req *req,
+ struct tevent_context *ev)
+{
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
+ int64_t res;
+
+ DBG_DEBUG("[CEPH] aio_send: ino=%" PRIu64 "fd=%d off=%jd len=%ju\n",
+ state->cfh->iref.ino,
+ state->cfh->fd,
+ state->off,
+ state->len);
+
+ state->io_info.callback = vfs_ceph_aio_complete;
+ state->iov.iov_base = state->data;
+ state->iov.iov_len = state->len;
+ state->io_info.priv = state;
+ state->io_info.fh = state->cfh->fh;
+ state->io_info.iov = &state->iov;
+ state->io_info.iovcnt = 1;
+ state->io_info.off = state->off;
+ state->io_info.write = state->write;
+ state->io_info.fsync = state->fsync;
+ state->io_info.result = 0;
+
+ vfs_ceph_aio_start(state);
+
+ res = vfs_ceph_ll_nonblocking_readv_writev(handle,
+ state->cfh,
+ &state->io_info);
+ if (res < 0) {
+ state->result = (int)res;
+ tevent_req_error(req, -((int)res));
+ tevent_req_post(req, ev);
+ return;
+ }
+
+ tevent_req_set_cleanup_fn(req, vfs_ceph_aio_cleanup);
+ return;
+}
+
+static void vfs_ceph_aio_done(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct tevent_req *req = private_data;
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
+
+ DBG_DEBUG("[CEPH] aio_done: ino=%" PRIu64
+ " fd=%d off=%jd len=%ju result=%jd\n",
+ state->cfh->iref.ino,
+ state->cfh->fd,
+ state->off,
+ state->len,
+ state->io_info.result);
+
+ vfs_ceph_aio_finish(state, state->io_info.result);
+ if (state->result < 0) {
+ tevent_req_error(req, -((int)state->result));
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static ssize_t vfs_ceph_aio_recv(struct tevent_req *req,
+ struct vfs_aio_state *vfs_aio_state)
+{
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
+ ssize_t res = -1;
+
+ DBG_DEBUG("[CEPH] aio_recv: ino=%" PRIu64
+ " fd=%d off=%jd len=%ju result=%ld\n",
+ state->cfh->iref.ino,
+ state->cfh->fd,
+ state->off,
+ state->len,
+ state->result);
+
+ if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+ goto out;
+ }
+
+ *vfs_aio_state = state->vfs_aio_state;
+ res = state->result;
+out:
+ tevent_req_received(req);
+ return res;
+}
+
+#endif /* HAVE_CEPH_ASYNCIO */
+
+static void vfs_ceph_aio_prepare(struct vfs_handle_struct *handle,
+ struct tevent_req *req,
+ struct tevent_context *ev,
+ struct files_struct *fsp)
+{
+ struct vfs_ceph_config *config = NULL;
+ struct vfs_ceph_aio_state *state = NULL;
+ int ret = -1;
+
+ SMB_VFS_HANDLE_GET_DATA(handle,
+ config,
+ struct vfs_ceph_config,
+ (void)0);
+ if (config == NULL) {
+ tevent_req_error(req, EINVAL);
+ return;
+ }
+
+ state = tevent_req_data(req, struct vfs_ceph_aio_state);
+ state->config = config;
+
+#if HAVE_CEPH_ASYNCIO
+ ret = vfs_ceph_require_tctx(state, ev);
+ if (ret != 0) {
+ tevent_req_error(req, -ret);
+ return;
+ }
+
+ state->im = tevent_create_immediate(state);
+ if (state->im == NULL) {
+ tevent_req_error(req, ENOMEM);
+ return;
+ }
+#endif
+
+ ret = vfs_ceph_fetch_io_fh(handle, fsp, &state->cfh);
+ if (ret != 0) {
+ tevent_req_error(req, -ret);
+ }
+}
+
static struct tevent_req *vfs_ceph_pread_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct files_struct *fsp,
void *data,
- size_t n, off_t offset)
+ size_t n,
+ off_t offset)
{
- struct vfs_ceph_fh *cfh = NULL;
struct tevent_req *req = NULL;
- struct vfs_ceph_pread_state *state = NULL;
+ struct vfs_ceph_aio_state *state = NULL;
int ret = -1;
DBG_DEBUG("[CEPH] pread_send(%p, %p, %p, %zu, %zd)\n",
data,
n,
offset);
- req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_pread_state);
+
+ req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_aio_state);
if (req == NULL) {
return NULL;
}
- ret = vfs_ceph_fetch_io_fh(handle, fsp, &cfh);
- if (ret != 0) {
- tevent_req_error(req, -ret);
+ vfs_ceph_aio_prepare(handle, req, ev, fsp);
+ if (!tevent_req_is_in_progress(req)) {
return tevent_req_post(req, ev);
}
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread,
profile_p,
- state->ceph_aio_state.profile_bytes,
+ state->profile_bytes,
n);
- SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ceph_aio_state.profile_bytes);
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
- vfs_ceph_aio_start(&state->ceph_aio_state);
- ret = vfs_ceph_ll_read(handle, cfh, offset, n, data);
- vfs_ceph_aio_finish(&state->ceph_aio_state, ret);
+#if HAVE_CEPH_ASYNCIO
+ state->req = req;
+ state->data = data;
+ state->len = n;
+ state->off = offset;
+ vfs_ceph_aio_submit(handle, req, ev);
+ return req;
+#endif
+ vfs_ceph_aio_start(state);
+ ret = vfs_ceph_ll_read(handle, state->cfh, offset, n, data);
+ vfs_ceph_aio_finish(state, ret);
if (ret < 0) {
/* ceph returns -errno on error. */
tevent_req_error(req, -ret);
return tevent_req_post(req, ev);
}
- state->bytes_read = ret;
tevent_req_done(req);
/* Return and schedule the completion of the call. */
return tevent_req_post(req, ev);
static ssize_t vfs_ceph_pread_recv(struct tevent_req *req,
struct vfs_aio_state *vfs_aio_state)
{
- struct vfs_ceph_pread_state *state =
- tevent_req_data(req, struct vfs_ceph_pread_state);
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
- DBG_DEBUG("[CEPH] pread_recv: bytes_read=%zd\n", state->bytes_read);
+ DBG_DEBUG("[CEPH] pread_recv: bytes_read=%zd\n", state->result);
- SMBPROFILE_BYTES_ASYNC_END(state->ceph_aio_state.profile_bytes);
+ SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
+#if HAVE_CEPH_ASYNCIO
+ return vfs_ceph_aio_recv(req, vfs_aio_state);
+#endif
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
return -1;
}
- *vfs_aio_state = state->ceph_aio_state.vfs_aio_state;
- return state->bytes_read;
+
+ *vfs_aio_state = state->vfs_aio_state;
+ return state->result;
}
static ssize_t vfs_ceph_pwrite(struct vfs_handle_struct *handle,
return lstatus_code(result);
}
-struct vfs_ceph_pwrite_state {
- ssize_t bytes_written;
- struct vfs_ceph_aio_state ceph_aio_state;
-};
-
-/*
- * Fake up an async ceph write by calling the synchronous API.
- */
static struct tevent_req *vfs_ceph_pwrite_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct files_struct *fsp,
const void *data,
- size_t n, off_t offset)
+ size_t n,
+ off_t offset)
{
- struct vfs_ceph_fh *cfh = NULL;
struct tevent_req *req = NULL;
- struct vfs_ceph_pwrite_state *state = NULL;
+ struct vfs_ceph_aio_state *state = NULL;
int ret = -1;
DBG_DEBUG("[CEPH] pwrite_send(%p, %p, %p, %zu, %zd)\n",
data,
n,
offset);
- req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_pwrite_state);
+
+ req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_aio_state);
if (req == NULL) {
return NULL;
}
- ret = vfs_ceph_fetch_io_fh(handle, fsp, &cfh);
- if (ret != 0) {
- tevent_req_error(req, -ret);
+ vfs_ceph_aio_prepare(handle, req, ev, fsp);
+ if (!tevent_req_is_in_progress(req)) {
return tevent_req_post(req, ev);
}
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite,
profile_p,
- state->ceph_aio_state.profile_bytes,
+ state->profile_bytes,
n);
- SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ceph_aio_state.profile_bytes);
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
+
+#if HAVE_CEPH_ASYNCIO
+ state->req = req;
+ state->data = discard_const(data);
+ state->len = n;
+ state->off = offset;
+ state->write = true;
+ vfs_ceph_aio_submit(handle, req, ev);
+ return req;
+#endif
- vfs_ceph_aio_start(&state->ceph_aio_state);
- ret = vfs_ceph_ll_write(handle, cfh, offset, n, data);
- vfs_ceph_aio_finish(&state->ceph_aio_state, ret);
+ vfs_ceph_aio_start(state);
+ ret = vfs_ceph_ll_write(handle, state->cfh, offset, n, data);
+ vfs_ceph_aio_finish(state, ret);
if (ret < 0) {
/* ceph returns -errno on error. */
tevent_req_error(req, -ret);
return tevent_req_post(req, ev);
}
- state->bytes_written = ret;
tevent_req_done(req);
/* Return and schedule the completion of the call. */
return tevent_req_post(req, ev);
static ssize_t vfs_ceph_pwrite_recv(struct tevent_req *req,
struct vfs_aio_state *vfs_aio_state)
{
- struct vfs_ceph_pwrite_state *state =
- tevent_req_data(req, struct vfs_ceph_pwrite_state);
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
+
+ DBG_DEBUG("[CEPH] pwrite_recv: bytes_written=%zd\n", state->result);
- DBG_DEBUG("[CEPH] pwrite_recv: bytes_written=%zd\n",
- state->bytes_written);
+ SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
- SMBPROFILE_BYTES_ASYNC_END(state->ceph_aio_state.profile_bytes);
+#if HAVE_CEPH_ASYNCIO
+ return vfs_ceph_aio_recv(req, vfs_aio_state);
+#endif
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
return -1;
}
- *vfs_aio_state = state->ceph_aio_state.vfs_aio_state;
- return state->bytes_written;
+
+ *vfs_aio_state = state->vfs_aio_state;
+ return state->result;
}
static off_t vfs_ceph_lseek(struct vfs_handle_struct *handle,
return status_code(result);
}
-/*
- * Fake up an async ceph fsync by calling the synchronous API.
- */
-
-struct vfs_ceph_fsync_state {
- struct vfs_ceph_aio_state ceph_aio_state;
-};
-
static struct tevent_req *vfs_ceph_fsync_send(struct vfs_handle_struct *handle,
- TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- files_struct *fsp)
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ files_struct *fsp)
{
- struct vfs_ceph_fh *cfh = NULL;
struct tevent_req *req = NULL;
- struct vfs_ceph_fsync_state *state = NULL;
+ struct vfs_ceph_aio_state *state = NULL;
int ret = -1;
DBG_DEBUG("[CEPH] fsync_send(%p, %p)\n", handle, fsp);
- req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_fsync_state);
+ req = tevent_req_create(mem_ctx, &state, struct vfs_ceph_aio_state);
if (req == NULL) {
return NULL;
}
- ret = vfs_ceph_fetch_io_fh(handle, fsp, &cfh);
- if (ret != 0) {
- tevent_req_error(req, -ret);
+ vfs_ceph_aio_prepare(handle, req, ev, fsp);
+ if (!tevent_req_is_in_progress(req)) {
return tevent_req_post(req, ev);
}
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_fsync,
profile_p,
- state->ceph_aio_state.profile_bytes,
+ state->profile_bytes,
0);
- SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ceph_aio_state.profile_bytes);
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
+
+#if HAVE_CEPH_ASYNCIO
+ state->req = req;
+ state->data = NULL;
+ state->len = 0;
+ state->off = 0;
+ state->fsync = true;
+ vfs_ceph_aio_submit(handle, req, ev);
+ return req;
+#endif
- /* Make sync call. */
- vfs_ceph_aio_start(&state->ceph_aio_state);
- ret = vfs_ceph_ll_fsync(handle, cfh, false);
- vfs_ceph_aio_finish(&state->ceph_aio_state, ret);
+ vfs_ceph_aio_start(state);
+ ret = vfs_ceph_ll_fsync(handle, state->cfh, false);
+ vfs_ceph_aio_finish(state, ret);
if (ret != 0) {
/* ceph_fsync returns -errno on error. */
tevent_req_error(req, -ret);
}
static int vfs_ceph_fsync_recv(struct tevent_req *req,
- struct vfs_aio_state *vfs_aio_state)
+ struct vfs_aio_state *vfs_aio_state)
{
- struct vfs_ceph_fsync_state *state = tevent_req_data(
- req, struct vfs_ceph_fsync_state);
+ struct vfs_ceph_aio_state *state = tevent_req_data(
+ req, struct vfs_ceph_aio_state);
DBG_DEBUG("[CEPH] fsync_recv: error=%d duration=%" PRIu64 "\n",
- state->ceph_aio_state.vfs_aio_state.error,
- state->ceph_aio_state.vfs_aio_state.duration);
+ state->vfs_aio_state.error,
+ state->vfs_aio_state.duration);
+
+ SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
- SMBPROFILE_BYTES_ASYNC_END(state->ceph_aio_state.profile_bytes);
+#if HAVE_CEPH_ASYNCIO
+ return vfs_ceph_aio_recv(req, vfs_aio_state);
+#endif
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
return -1;
}
- *vfs_aio_state = state->ceph_aio_state.vfs_aio_state;
+
+ *vfs_aio_state = state->vfs_aio_state;
return 0;
}