#include "lib/util/tevent_unix.h"
#include <rados/librgw.h>
#include <rados/rgw_file.h>
+#include <stdalign.h>
#undef DBGC_CLASS
#define DBGC_CLASS DBGC_VFS
+/* Cache dir entries */
+#define MAX_DIR_ENTRIES 100
+
static uint64_t rgw_fd_index = 0;
struct vfs_ceph_rgw_config {
struct rgw_file_handle *rgw_root_fh;
};
+struct vfs_ceph_rgw_rd_arg {
+ struct vfs_ceph_rgw_dir *dirp;
+ bool eof;
+ int cb_err;
+ char whence[NAME_MAX + 1];
+};
+
+struct vfs_ceph_rgw_dir {
+ int pos;
+ int num;
+ struct vfs_ceph_rgw_fh *dirfh;
+ struct vfs_ceph_rgw_rd_arg cb_arg;
+ struct dirent *dirs;
+};
+
/* Ceph-rgw file-handles via fsp-extension */
struct vfs_ceph_rgw_fh {
struct vfs_ceph_rgw_dir *dirp;
return status_code(rc);
}
+/*
+ * Note about return value:
+ * Return value of this callback does not indicate success or failure.
+ * Basically this return value is used for flow control,
+ * instructing backend to invoke this callback.
+ * Therefore, 'return 0' means stop invoking this callback,
+ * 'return 1' means continue invoking this callback.
+ */
+static int vfs_ceph_rgw_rd_cb(const char *name,
+ void *arg,
+ uint64_t offset,
+ struct stat *st,
+ uint32_t mask,
+ uint32_t flags)
+{
+ size_t len = 0;
+ struct vfs_ceph_rgw_rd_arg *cb_arg = (struct vfs_ceph_rgw_rd_arg *)arg;
+ struct dirent *d = NULL;
+ struct vfs_ceph_rgw_dir *dirp = cb_arg->dirp;
+
+ DBG_DEBUG("[CEPH_RGW]: Object-name: %s mask=%u flags=%u\n",
+ name,
+ mask,
+ flags);
+
+ /* Ensure we never over-run array */
+ if (dirp->num == MAX_DIR_ENTRIES) {
+ return 0;
+ }
+
+ len = strlen(name);
+ /* prepare dentry */
+ d = &dirp->dirs[dirp->num];
+ d->d_ino = st->st_ino;
+ d->d_off = offset;
+ strlcpy(d->d_name, name, sizeof(d->d_name));
+ len = offsetof(struct dirent, d_name) + len + 1;
+ d->d_reclen = (len + alignof(struct dirent) - 1) &
+ (~(alignof(struct dirent) - 1));
+ d->d_type = IFTODT(flags);
+ dirp->num += 1;
+
+ /* update 'whence' */
+ len = strlcpy(cb_arg->whence, name, sizeof(cb_arg->whence));
+ if (len >= sizeof(cb_arg->whence)) {
+ cb_arg->cb_err = -ENAMETOOLONG;
+ return 0;
+ }
+
+ /* Since its not end of dir listing, return non-zero to continue
+ * listing.
+ */
+ return 1;
+}
+
+static DIR *vfs_ceph_rgw_fdopendir(vfs_handle_struct *handle,
+ files_struct *fsp,
+ const char *mask,
+ uint32_t attr)
+{
+ int rc = 0;
+ struct vfs_ceph_rgw_dir *dirp = NULL;
+ struct vfs_ceph_rgw_fh *openfh = NULL;
+ struct vfs_ceph_rgw_rd_arg *cb_arg = NULL;
+ START_PROFILE_X(SNUM(handle->conn), syscall_fdopendir);
+
+ DBG_DEBUG("[CEPH_RGW] fdopendir: name [%s]\n", fsp_str_dbg(fsp));
+
+ rc = vfs_ceph_rgw_fetch_fh(handle, fsp, &openfh);
+ if (rc < 0) {
+ DBG_ERR("[CEPH_RGW] Unable to find open handle for %s. "
+ "rc=%d\n",
+ fsp_str_dbg(fsp),
+ rc);
+ goto out;
+ }
+
+ dirp = talloc_zero(handle->conn, struct vfs_ceph_rgw_dir);
+ if (dirp == NULL) {
+ DBG_ERR("[CEPH_RGW] Not enough memory for dir info.\n");
+ goto out;
+ }
+
+ dirp->dirs = talloc_array(dirp, struct dirent, MAX_DIR_ENTRIES);
+ if (dirp->dirs == NULL) {
+ DBG_ERR("[CEPH_RGW] Not enough memory for dir entries\n");
+ TALLOC_FREE(dirp);
+ dirp = NULL;
+ goto out;
+ }
+
+ /* init dirp */
+ dirp->dirfh = openfh;
+
+ /* init callback argument for readdir */
+ cb_arg = &dirp->cb_arg;
+ cb_arg->dirp = dirp;
+ cb_arg->eof = false;
+
+ DBG_DEBUG("[CEPH_RGW] fdopendir: [%s] success.\n", fsp_str_dbg(fsp));
+
+out:
+ END_PROFILE_X(syscall_fdopendir);
+ return (DIR *)dirp;
+}
+
+static int vfs_ceph_rgw_closedir(struct vfs_handle_struct *handle, DIR *dirp)
+{
+ int rc = -ENOMEM;
+ struct vfs_ceph_rgw_dir *rgw_dirp = (struct vfs_ceph_rgw_dir *)dirp;
+ struct vfs_ceph_rgw_config *config = NULL;
+
+ START_PROFILE_X(SNUM(handle->conn), syscall_closedir);
+
+ SMB_VFS_HANDLE_GET_DATA(handle,
+ config,
+ struct vfs_ceph_rgw_config,
+ goto out);
+
+ DBG_DEBUG("[CEPH_RGW] closedir: dirp=%p\n", dirp);
+ rc = rgw_close(config->rgw_root_fs,
+ rgw_dirp->dirfh->rgw_fh,
+ RGW_CLOSE_FLAG_RELE);
+ if (rc < 0) {
+ DBG_ERR("[CEPH_RGW] Unable to close directory. rc = %d\n", rc);
+ /* fall through */
+ }
+ vfs_ceph_rgw_remove_fh(handle, rgw_dirp->dirfh->fsp);
+ TALLOC_FREE(rgw_dirp);
+
+out:
+ END_PROFILE_X(syscall_closedir);
+ return status_code(rc);
+}
+
+static struct dirent *vfs_ceph_rgw_readdir(struct vfs_handle_struct *handle,
+ struct files_struct *dirfsp,
+ DIR *dirp)
+{
+ int rc = 0;
+ int saved_errno = errno;
+ struct dirent *ret = NULL;
+ struct vfs_ceph_rgw_config *config = NULL;
+ struct vfs_ceph_rgw_dir *rgw_dirp = (struct vfs_ceph_rgw_dir *)dirp;
+ struct vfs_ceph_rgw_rd_arg *cb_arg = &rgw_dirp->cb_arg;
+ struct vfs_ceph_rgw_fh *dirfh = rgw_dirp->dirfh;
+ START_PROFILE_X(SNUM(handle->conn), syscall_readdir);
+
+ DBG_DEBUG("[CEPH_RGW] readdir: name [%s]\n", fsp_str_dbg(dirfsp));
+
+ SMB_VFS_HANDLE_GET_DATA(handle,
+ config,
+ struct vfs_ceph_rgw_config,
+ goto out);
+
+ /* rgw_readdir2() fetches max 1000 entries on every call till
+ * eof is reached and thus we cache MAX_DIR_ENTRIES and return dir
+ * entries from this cache.
+ * If entries exceed beyond MAX_DIR_ENTRIES then we just reset 'num' to
+ * 0 so that subsequent call to readdir2() would reuse this cache,
+ * this ensure we always use a fix amount of memory regardless the
+ * number of entries in directory.
+ *
+ * Additionally we issue rgw_readdir2() only after we have consumed
+ * all entries with help of 'pos'. Therefore upper layer won't miss any
+ * entry.
+ *
+ * Note:
+ * Currently librgw do not have any mechanism to notify changes for a
+ * directory. Thus in case of additions/deletions of files to/from
+ * directory won't be visible until rgw_readdir2() is called with
+ * 'whence' as NULL.
+ */
+ if (!cb_arg->eof && rgw_dirp->pos == rgw_dirp->num) {
+ rgw_dirp->num = 0;
+ rgw_dirp->pos = 0;
+ cb_arg->cb_err = 0;
+ rc = rgw_readdir2(config->rgw_root_fs,
+ dirfh->rgw_fh,
+ (cb_arg->whence[0] != '\0') ? cb_arg->whence
+ : NULL,
+ vfs_ceph_rgw_rd_cb,
+ cb_arg,
+ &cb_arg->eof,
+ RGW_READDIR_FLAG_NONE);
+ if (rc < 0 || cb_arg->cb_err < 0) {
+ if (rc < 0) {
+ saved_errno = -rc;
+ } else {
+ saved_errno = -cb_arg->cb_err;
+ }
+ ret = NULL;
+ DBG_ERR("[CEPH_RGW] readdir failed. rc=%d cb_err=%d\n",
+ rc,
+ cb_arg->cb_err);
+ goto out;
+ }
+ }
+
+ if (rgw_dirp->pos < rgw_dirp->num) {
+ ret = &rgw_dirp->dirs[rgw_dirp->pos++];
+ }
+
+ DBG_DEBUG("[CEPH_RGW] readdir: [%s] success.\n", fsp_str_dbg(dirfsp));
+out:
+ errno = saved_errno;
+ END_PROFILE_X(syscall_readdir);
+ return ret;
+}
+
+static void vfs_ceph_rgw_rewinddir(struct vfs_handle_struct *handle, DIR *dirp)
+{
+ struct vfs_ceph_rgw_dir *rgw_dirp = (struct vfs_ceph_rgw_dir *)dirp;
+ struct vfs_ceph_rgw_rd_arg *cb_arg = &rgw_dirp->cb_arg;
+ START_PROFILE_X(SNUM(handle->conn), syscall_rewinddir);
+
+ rgw_dirp->pos = 0;
+ rgw_dirp->num = 0;
+ cb_arg->whence[0] = '\0';
+ cb_arg->eof = false;
+
+ END_PROFILE_X(syscall_rewinddir);
+ return;
+}
+
static bool vfs_ceph_rgw_mount_bucket(struct vfs_ceph_rgw_config *config)
{
int rc = 0;
/* Directory operations */
- .fdopendir_fn = vfs_not_implemented_fdopendir,
- .readdir_fn = vfs_not_implemented_readdir,
- .rewind_dir_fn = vfs_not_implemented_rewind_dir,
+ .fdopendir_fn = vfs_ceph_rgw_fdopendir,
+ .readdir_fn = vfs_ceph_rgw_readdir,
+ .rewind_dir_fn = vfs_ceph_rgw_rewinddir,
.mkdirat_fn = vfs_not_implemented_mkdirat,
- .closedir_fn = vfs_not_implemented_closedir,
+ .closedir_fn = vfs_ceph_rgw_closedir,
/* File operations */