static void
process_ag_func(
- work_queue_t *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
*/
wait_for_inode_prefetch(arg);
do_log(_(" - agno = %d\n"), agno);
- process_aginodes(wq->mp, arg, agno, 1, 0, 1);
+ process_aginodes(wq->wq_ctx, arg, agno, 1, 0, 1);
blkmap_free_final();
cleanup_inode_prefetch(arg);
}
static void
do_uncertain_aginodes(
- work_queue_t *wq,
- xfs_agnumber_t agno,
- void *arg)
+ struct workqueue *wq,
+ xfs_agnumber_t agno,
+ void *arg)
{
- int *count = arg;
+ int *count = arg;
- *count = process_uncertain_aginodes(wq->mp, agno);
+ *count = process_uncertain_aginodes(wq->wq_ctx, agno);
#ifdef XR_INODE_TRACE
fprintf(stderr,
{
int i, j;
int *counts;
- work_queue_t wq;
+ struct workqueue wq;
do_log(_("Phase 3 - for each AG...\n"));
if (!no_modify)
static void
process_ag_func(
- work_queue_t *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
wait_for_inode_prefetch(arg);
do_log(_(" - agno = %d\n"), agno);
- process_aginodes(wq->mp, arg, agno, 0, 1, 0);
+ process_aginodes(wq->wq_ctx, arg, agno, 0, 1, 0);
blkmap_free_final();
cleanup_inode_prefetch(arg);
static void
check_rmap_btrees(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = rmap_add_fixed_ag_rec(wq->mp, agno);
+ error = rmap_add_fixed_ag_rec(wq->wq_ctx, agno);
if (error)
do_error(
_("unable to add AG %u metadata reverse-mapping data.\n"), agno);
- error = rmap_fold_raw_recs(wq->mp, agno);
+ error = rmap_fold_raw_recs(wq->wq_ctx, agno);
if (error)
do_error(
_("unable to merge AG %u metadata reverse-mapping data.\n"), agno);
- error = rmaps_verify_btree(wq->mp, agno);
+ error = rmaps_verify_btree(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while checking reverse-mappings"),
static void
compute_ag_refcounts(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = compute_refcounts(wq->mp, agno);
+ error = compute_refcounts(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while computing reference count records.\n"),
static void
process_inode_reflink_flags(
- struct work_queue *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = fix_inode_reflink_flags(wq->mp, agno);
+ error = fix_inode_reflink_flags(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while fixing inode reflink flags.\n"),
static void
check_refcount_btrees(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
int error;
- error = check_refcounts(wq->mp, agno);
+ error = check_refcounts(wq->wq_ctx, agno);
if (error)
do_error(
_("%s while checking reference counts"),
process_rmap_data(
struct xfs_mount *mp)
{
- struct work_queue wq;
+ struct workqueue wq;
xfs_agnumber_t i;
if (!rmap_needs_work(mp))
static void
traverse_function(
- work_queue_t *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
for (i = 0; i < XFS_INODES_PER_CHUNK; i++) {
if (inode_isadir(irec, i))
- process_dir_inode(wq->mp, agno, irec, i);
+ process_dir_inode(wq->wq_ctx, agno, irec, i);
}
}
cleanup_inode_prefetch(pf_args);
*/
static void
do_link_updates(
- struct work_queue *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
+ struct xfs_mount *mp = wq->wq_ctx;
ino_tree_node_t *irec;
int j;
uint32_t nrefs;
ASSERT(no_modify || nrefs > 0);
if (get_inode_disk_nlinks(irec, j) != nrefs)
- update_inode_nlinks(wq->mp,
- XFS_AGINO_TO_INO(wq->mp, agno,
+ update_inode_nlinks(wq->wq_ctx,
+ XFS_AGINO_TO_INO(mp, agno,
irec->ino_startnum + j),
nrefs);
}
struct xfs_mount *mp,
int scan_threads)
{
- struct work_queue wq;
+ struct workqueue wq;
int agno;
if (!no_modify)
*/
static void
prefetch_ag_range(
- struct work_queue *work,
+ struct workqueue *work,
xfs_agnumber_t start_ag,
xfs_agnumber_t end_ag,
bool dirs_only,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *))
{
int i;
xfs_agnumber_t start_ag;
xfs_agnumber_t end_ag;
bool dirs_only;
- void (*func)(struct work_queue *, xfs_agnumber_t, void *);
+ void (*func)(struct workqueue *, xfs_agnumber_t, void *);
};
static void
prefetch_ag_range_work(
- struct work_queue *work,
+ struct workqueue *work,
xfs_agnumber_t unused,
void *args)
{
do_inode_prefetch(
struct xfs_mount *mp,
int stride,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *),
bool check_cache,
bool dirs_only)
{
int i;
- struct work_queue queue;
- struct work_queue *queues;
+ struct workqueue queue;
+ struct workqueue *queues;
int queues_started = 0;
/*
* CPU to maximise parallelism of the queue to be processed.
*/
if (check_cache && !libxfs_bcache_overflowed()) {
- queue.mp = mp;
+ queue.wq_ctx = mp;
create_work_queue(&queue, mp, libxfs_nproc());
for (i = 0; i < mp->m_sb.sb_agcount; i++)
queue_work(&queue, func, i, NULL);
* directly after each AG is queued.
*/
if (!stride) {
- queue.mp = mp;
+ queue.wq_ctx = mp;
prefetch_ag_range(&queue, 0, mp->m_sb.sb_agcount,
dirs_only, func);
return;
/*
* create one worker thread for each segment of the volume
*/
- queues = malloc(thread_count * sizeof(work_queue_t));
+ queues = malloc(thread_count * sizeof(struct workqueue));
for (i = 0; i < thread_count; i++) {
struct pf_work_args *wargs;
#include <semaphore.h>
#include "incore.h"
-struct work_queue;
+struct workqueue;
extern int do_prefetch;
do_inode_prefetch(
struct xfs_mount *mp,
int stride,
- void (*func)(struct work_queue *,
+ void (*func)(struct workqueue *,
xfs_agnumber_t, void *),
bool check_cache,
bool dirs_only);
*/
static void
scan_ag(
- work_queue_t *wq,
+ struct workqueue*wq,
xfs_agnumber_t agno,
void *arg)
{
struct xfs_mount *mp,
int scan_threads)
{
- struct aghdr_cnts *agcnts;
- uint64_t fdblocks = 0;
- uint64_t icount = 0;
- uint64_t ifreecount = 0;
- uint64_t usedblocks = 0;
- xfs_agnumber_t i;
- work_queue_t wq;
+ struct aghdr_cnts *agcnts;
+ uint64_t fdblocks = 0;
+ uint64_t icount = 0;
+ uint64_t ifreecount = 0;
+ uint64_t usedblocks = 0;
+ xfs_agnumber_t i;
+ struct workqueue wq;
agcnts = malloc(mp->m_sb.sb_agcount * sizeof(*agcnts));
if (!agcnts) {
static void
qsort_slab_helper(
- struct work_queue *wq,
+ struct workqueue *wq,
xfs_agnumber_t agno,
void *arg)
{
struct xfs_slab *slab,
int (*compare_fn)(const void *, const void *))
{
- struct work_queue wq;
+ struct workqueue wq;
struct xfs_slab_hdr *hdr;
struct qsort_slab *qs;
#include "protos.h"
#include "globals.h"
-static void *
-worker_thread(void *arg)
-{
- work_queue_t *wq;
- work_item_t *wi;
-
- wq = (work_queue_t*)arg;
-
- /*
- * Loop pulling work from the passed in work queue.
- * Check for notification to exit after every chunk of work.
- */
- while (1) {
- pthread_mutex_lock(&wq->lock);
-
- /*
- * Wait for work.
- */
- while (wq->next_item == NULL && !wq->terminate) {
- ASSERT(wq->item_count == 0);
- pthread_cond_wait(&wq->wakeup, &wq->lock);
- }
- if (wq->next_item == NULL && wq->terminate) {
- pthread_mutex_unlock(&wq->lock);
- break;
- }
-
- /*
- * Dequeue work from the head of the list.
- */
- ASSERT(wq->item_count > 0);
- wi = wq->next_item;
- wq->next_item = wi->next;
- wq->item_count--;
-
- pthread_mutex_unlock(&wq->lock);
-
- (wi->function)(wi->queue, wi->agno, wi->arg);
- free(wi);
- }
-
- return NULL;
-}
-
void
thread_init(void)
{
void
create_work_queue(
- work_queue_t *wq,
- xfs_mount_t *mp,
- int nworkers)
+ struct workqueue *wq,
+ struct xfs_mount *mp,
+ unsigned int nworkers)
{
int err;
- int i;
-
- memset(wq, 0, sizeof(work_queue_t));
- pthread_cond_init(&wq->wakeup, NULL);
- pthread_mutex_init(&wq->lock, NULL);
-
- wq->mp = mp;
- wq->thread_count = nworkers;
- wq->threads = malloc(nworkers * sizeof(pthread_t));
- wq->terminate = 0;
-
- for (i = 0; i < nworkers; i++) {
- err = pthread_create(&wq->threads[i], NULL, worker_thread, wq);
- if (err != 0) {
- do_error(_("cannot create worker threads, error = [%d] %s\n"),
+ err = workqueue_create(wq, mp, nworkers);
+ if (err)
+ do_error(_("cannot create worker threads, error = [%d] %s\n"),
err, strerror(err));
- }
- }
-
}
void
queue_work(
- work_queue_t *wq,
- work_func_t func,
- xfs_agnumber_t agno,
- void *arg)
+ struct workqueue *wq,
+ workqueue_func_t func,
+ xfs_agnumber_t agno,
+ void *arg)
{
- work_item_t *wi;
+ int err;
- wi = (work_item_t *)malloc(sizeof(work_item_t));
- if (wi == NULL)
+ err = workqueue_add(wq, func, agno, arg);
+ if (err)
do_error(_("cannot allocate worker item, error = [%d] %s\n"),
- errno, strerror(errno));
-
- wi->function = func;
- wi->agno = agno;
- wi->arg = arg;
- wi->queue = wq;
- wi->next = NULL;
-
- /*
- * Now queue the new work structure to the work queue.
- */
- pthread_mutex_lock(&wq->lock);
- if (wq->next_item == NULL) {
- wq->next_item = wi;
- ASSERT(wq->item_count == 0);
- pthread_cond_signal(&wq->wakeup);
- } else {
- wq->last_item->next = wi;
- }
- wq->last_item = wi;
- wq->item_count++;
- pthread_mutex_unlock(&wq->lock);
+ err, strerror(err));
}
void
destroy_work_queue(
- work_queue_t *wq)
+ struct workqueue *wq)
{
- int i;
-
- pthread_mutex_lock(&wq->lock);
- wq->terminate = 1;
- pthread_mutex_unlock(&wq->lock);
-
- pthread_cond_broadcast(&wq->wakeup);
-
- for (i = 0; i < wq->thread_count; i++)
- pthread_join(wq->threads[i], NULL);
-
- free(wq->threads);
- pthread_mutex_destroy(&wq->lock);
- pthread_cond_destroy(&wq->wakeup);
+ workqueue_destroy(wq);
}
#ifndef _XFS_REPAIR_THREADS_H_
#define _XFS_REPAIR_THREADS_H_
-void thread_init(void);
-
-struct work_queue;
-
-typedef void work_func_t(struct work_queue *, xfs_agnumber_t, void *);
+#include "workqueue.h"
-typedef struct work_item {
- struct work_item *next;
- work_func_t *function;
- struct work_queue *queue;
- xfs_agnumber_t agno;
- void *arg;
-} work_item_t;
-
-typedef struct work_queue {
- work_item_t *next_item;
- work_item_t *last_item;
- int item_count;
- int thread_count;
- pthread_t *threads;
- xfs_mount_t *mp;
- pthread_mutex_t lock;
- pthread_cond_t wakeup;
- int terminate;
-} work_queue_t;
+void thread_init(void);
void
create_work_queue(
- work_queue_t *wq,
- xfs_mount_t *mp,
- int nworkers);
+ struct workqueue *wq,
+ struct xfs_mount *mp,
+ unsigned int nworkers);
void
queue_work(
- work_queue_t *wq,
- work_func_t func,
+ struct workqueue *wq,
+ workqueue_func_t func,
xfs_agnumber_t agno,
void *arg);
void
destroy_work_queue(
- work_queue_t *wq);
+ struct workqueue *wq);
#endif /* _XFS_REPAIR_THREADS_H_ */