+// SPDX-License-Identifier: GPL-2.0
+
#include "libxfs.h"
#include <pthread.h>
#include "avl.h"
return;
dsize = XFS_DFORK_DSIZE(dino, mp);
- pp = XFS_BMDR_PTR_ADDR(dib, 1, xfs_bmdr_maxrecs(dsize, 0));
+ pp = XFS_BMDR_PTR_ADDR(dib, 1, libxfs_bmdr_maxrecs(dsize, 0));
for (i = 0; i < numrecs; i++) {
dbno = get_unaligned_be64(&pp[i]);
if (bp->b_error)
return;
- for (icnt = 0; icnt < (XFS_BUF_COUNT(bp) >> mp->m_sb.sb_inodelog); icnt++) {
+ for (icnt = 0; icnt < (bp->b_bcount >> mp->m_sb.sb_inodelog); icnt++) {
dino = xfs_make_iptr(mp, bp, icnt);
/*
if (be16_to_cpu(dino->di_magic) != XFS_DINODE_MAGIC)
continue;
- if (!xfs_dinode_good_version(mp, dino->di_version))
+ if (!libxfs_dinode_good_version(mp, dino->di_version))
continue;
if (be64_to_cpu(dino->di_size) <= XFS_DFORK_DSIZE(dino, mp))
num = 0;
if (which == PF_SECONDARY) {
bplist[0] = btree_find(args->io_queue, 0, &fsbno);
- max_fsbno = MIN(fsbno + pf_max_fsbs,
+ max_fsbno = min(fsbno + pf_max_fsbs,
args->last_bno_read);
} else {
bplist[0] = btree_find(args->io_queue,
/*
* now read the data and put into the xfs_but_t's
*/
- len = pread64(mp_fd, buf, (int)(last_off - first_off), first_off);
+ len = pread(mp_fd, buf, (int)(last_off - first_off), first_off);
/*
* Check the last buffer on the list to see if we need to
size = XFS_BUF_SIZE(bplist[i]);
if (len < size)
break;
- memcpy(XFS_BUF_PTR(bplist[i]), pbuf, size);
+ memcpy(bplist[i]->b_addr, pbuf, size);
bplist[i]->b_flags |= (LIBXFS_B_UPTODATE |
LIBXFS_B_UNCHECKED);
len -= size;
pf_create_prefetch_thread(
prefetch_args_t *args);
+/*
+ * If we fail to create the queuing thread or can't create even one
+ * prefetch thread, we need to let processing continue without it.
+ */
+static void
+pf_skip_prefetch_thread(prefetch_args_t *args)
+{
+ prefetch_args_t *next;
+
+ pthread_mutex_lock(&args->lock);
+ args->prefetch_done = 1;
+ pf_start_processing(args);
+ next = args->next_args;
+ args->next_args = NULL;
+ pthread_mutex_unlock(&args->lock);
+
+ if (next)
+ pf_create_prefetch_thread(next);
+}
+
static void *
pf_queuing_worker(
void *param)
{
prefetch_args_t *args = param;
+ prefetch_args_t *next_args;
int num_inos;
ino_tree_node_t *irec;
ino_tree_node_t *cur_irec;
if (err != 0) {
do_warn(_("failed to create prefetch thread: %s\n"),
strerror(err));
+ pftrace("failed to create prefetch thread for AG %d: %s",
+ args->agno, strerror(err));
+ args->io_threads[i] = 0;
if (i == 0) {
- pf_start_processing(args);
+ pf_skip_prefetch_thread(args);
return NULL;
}
/*
* might get stuck on a buffer that has been locked
* and added to the I/O queue but is waiting for
* the thread to be woken.
+ * Start processing as well, in case everything so
+ * far was already prefetched and the queue is empty.
*/
+
pf_start_io_workers(args);
+ pf_start_processing(args);
sem_wait(&args->ra_count);
}
ASSERT(btree_is_empty(args->io_queue));
args->prefetch_done = 1;
- if (args->next_args)
- pf_create_prefetch_thread(args->next_args);
-
+ next_args = args->next_args;
+ args->next_args = NULL;
pthread_mutex_unlock(&args->lock);
+ if (next_args)
+ pf_create_prefetch_thread(next_args);
+
return NULL;
}
if (err != 0) {
do_warn(_("failed to create prefetch thread: %s\n"),
strerror(err));
- cleanup_inode_prefetch(args);
+ pftrace("failed to create prefetch thread for AG %d: %s",
+ args->agno, strerror(err));
+ args->queuing_thread = 0;
+ pf_skip_prefetch_thread(args);
}
return err == 0;
} else {
pthread_mutex_lock(&prev_args->lock);
if (prev_args->prefetch_done) {
+ pthread_mutex_unlock(&prev_args->lock);
if (!pf_create_prefetch_thread(args))
args = NULL;
- } else
+ } else {
prev_args->next_args = args;
- pthread_mutex_unlock(&prev_args->lock);
+ pftrace("queued AG %d after AG %d",
+ args->agno, prev_args->agno);
+ pthread_mutex_unlock(&prev_args->lock);
+ }
}
return args;
*/
static void
prefetch_ag_range(
- struct work_queue *work,
+ struct workqueue *work,
xfs_agnumber_t start_ag,
xfs_agnumber_t end_ag,
bool dirs_only,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *))
{
int i;
xfs_agnumber_t start_ag;
xfs_agnumber_t end_ag;
bool dirs_only;
- void (*func)(struct work_queue *, xfs_agnumber_t, void *);
+ void (*func)(struct workqueue *, xfs_agnumber_t, void *);
};
static void
prefetch_ag_range_work(
- struct work_queue *work,
+ struct workqueue *work,
xfs_agnumber_t unused,
void *args)
{
do_inode_prefetch(
struct xfs_mount *mp,
int stride,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *),
bool check_cache,
bool dirs_only)
{
int i;
- struct work_queue queue;
- struct work_queue *queues;
+ struct workqueue queue;
+ struct workqueue *queues;
int queues_started = 0;
/*
* CPU to maximise parallelism of the queue to be processed.
*/
if (check_cache && !libxfs_bcache_overflowed()) {
- queue.mp = mp;
+ queue.wq_ctx = mp;
create_work_queue(&queue, mp, libxfs_nproc());
for (i = 0; i < mp->m_sb.sb_agcount; i++)
queue_work(&queue, func, i, NULL);
* directly after each AG is queued.
*/
if (!stride) {
- queue.mp = mp;
+ queue.wq_ctx = mp;
prefetch_ag_range(&queue, 0, mp->m_sb.sb_agcount,
dirs_only, func);
return;
/*
* create one worker thread for each segment of the volume
*/
- queues = malloc(thread_count * sizeof(work_queue_t));
+ queues = malloc(thread_count * sizeof(struct workqueue));
for (i = 0; i < thread_count; i++) {
struct pf_work_args *wargs;
pftrace("AG %d prefetch done", args->agno);
+ ASSERT(args->next_args == NULL);
+
pthread_mutex_destroy(&args->lock);
pthread_cond_destroy(&args->start_reading);
pthread_cond_destroy(&args->start_processing);