{ "fdcache-prefetch", ARGP_KEY_FDCACHE_PREFETCH, "NUM", 0, "Number of archive files to prefetch into fdcache.", 0 },
#define ARGP_KEY_FDCACHE_MINTMP 0x1004
{ "fdcache-mintmp", ARGP_KEY_FDCACHE_MINTMP, "NUM", 0, "Minimum free space% on tmpdir.", 0 },
- { NULL, 0, NULL, 0, NULL, 0 }
+#define ARGP_KEY_FDCACHE_PREFETCH_MBS 0x1005
+ { "fdcache-prefetch-mbs", ARGP_KEY_FDCACHE_PREFETCH_MBS, "MB", 0,"Megabytes allocated to the \
+ prefetch cache.", 0},
+#define ARGP_KEY_FDCACHE_PREFETCH_FDS 0x1006
+ { "fdcache-prefetch-fds", ARGP_KEY_FDCACHE_PREFETCH_FDS, "NUM", 0,"Number of files allocated to the \
+ prefetch cache.", 0},
+ { NULL, 0, NULL, 0, NULL, 0 },
};
/* Short description of program. */
static long fdcache_mbs;
static long fdcache_prefetch;
static long fdcache_mintmp;
+static long fdcache_prefetch_mbs;
+static long fdcache_prefetch_fds;
static string tmpdir;
static void set_metric(const string& key, double value);
break;
case ARGP_KEY_FDCACHE_MINTMP:
fdcache_mintmp = atol (arg);
+ if( fdcache_mintmp > 100 || fdcache_mintmp < 0 )
+ argp_failure(state, 1, EINVAL, "fdcache mintmp percent");
break;
case ARGP_KEY_ARG:
source_paths.insert(string(arg));
break;
+ case ARGP_KEY_FDCACHE_PREFETCH_FDS:
+ fdcache_prefetch_fds = atol(arg);
+ if ( fdcache_prefetch_fds < 0)
+ argp_failure(state, 1, EINVAL, "fdcache prefetch fds");
+ break;
+ case ARGP_KEY_FDCACHE_PREFETCH_MBS:
+ fdcache_prefetch_mbs = atol(arg);
+ if ( fdcache_prefetch_mbs < 0)
+ argp_failure(state, 1, EINVAL, "fdcache prefetch mbs");
+ break;
// case 'h': argp_state_help (state, stderr, ARGP_HELP_LONG|ARGP_HELP_EXIT_OK);
default: return ARGP_ERR_UNKNOWN;
}
};
deque<fdcache_entry> lru; // @head: most recently used
long max_fds;
+ deque<fdcache_entry> prefetch; // prefetched
long max_mbs;
+ long max_prefetch_mbs;
+ long max_prefetch_fds;
public:
void set_metrics()
{
- double total_mb = 0.0;
+ double fdcache_mb = 0.0;
+ double prefetch_mb = 0.0;
for (auto i = lru.begin(); i < lru.end(); i++)
- total_mb += i->fd_size_mb;
- set_metric("fdcache_bytes", (int64_t)(total_mb*1024.0*1024.0));
+ fdcache_mb += i->fd_size_mb;
+ for (auto j = prefetch.begin(); j < prefetch.end(); j++)
+ prefetch_mb += j->fd_size_mb;
+ set_metric("fdcache_bytes", fdcache_mb*1024.0*1024.0);
set_metric("fdcache_count", lru.size());
+ set_metric("fdcache_prefetch_bytes", prefetch_mb*1024.0*1024.0);
+ set_metric("fdcache_prefetch_count", prefetch.size());
}
void intern(const string& a, const string& b, string fd, off_t sz, bool front_p)
{
{
unique_lock<mutex> lock(fdcache_lock);
- for (auto i = lru.begin(); i < lru.end(); i++) // nuke preexisting copy
+ // nuke preexisting copy
+ for (auto i = lru.begin(); i < lru.end(); i++)
{
if (i->archive == a && i->entry == b)
{
break; // must not continue iterating
}
}
+ // nuke preexisting copy in prefetch
+ for (auto i = prefetch.begin(); i < prefetch.end(); i++)
+ {
+ if (i->archive == a && i->entry == b)
+ {
+ unlink (i->fd.c_str());
+ prefetch.erase(i);
+ inc_metric("fdcache_op_count","op","prefetch_dequeue");
+ break; // must not continue iterating
+ }
+ }
double mb = (sz+65535)/1048576.0; // round up to 64K block
fdcache_entry n = { a, b, fd, mb };
if (front_p)
{
- inc_metric("fdcache_op_count","op","enqueue_front");
+ inc_metric("fdcache_op_count","op","enqueue");
lru.push_front(n);
}
else
{
- inc_metric("fdcache_op_count","op","enqueue_back");
- lru.push_back(n);
+ inc_metric("fdcache_op_count","op","prefetch_enqueue");
+ prefetch.push_front(n);
}
if (verbose > 3)
obatched(clog) << "fdcache interned a=" << a << " b=" << b
{
inc_metric("fdcache_op_count","op","emerg-flush");
obatched(clog) << "fdcache emergency flush for filling tmpdir" << endl;
- this->limit(0, 0); // emergency flush
+ this->limit(0, 0, 0, 0); // emergency flush
}
else if (front_p)
- this->limit(max_fds, max_mbs); // age cache if required
+ this->limit(max_fds, max_mbs, max_prefetch_fds, max_prefetch_mbs); // age cache if required
}
int lookup(const string& a, const string& b)
lru.erase(i); // invalidates i, so no more iteration!
lru.push_front(n);
inc_metric("fdcache_op_count","op","requeue_front");
- fd = open(n.fd.c_str(), O_RDONLY); // NB: no problem if dup() fails; looks like cache miss
+ fd = open(n.fd.c_str(), O_RDONLY);
+ break;
+ }
+ }
+ // Iterate through prefetch while fd == -1 to ensure that no duplication between lru and
+ // prefetch occurs.
+ for ( auto i = prefetch.begin(); fd == -1 && i < prefetch.end(); ++i)
+ {
+ if (i->archive == a && i->entry == b)
+ { // found it; take the entry from the prefetch deque to the lru deque, since it has now been accessed.
+ fdcache_entry n = *i;
+ prefetch.erase(i);
+ lru.push_front(n);
+ inc_metric("fdcache_op_count","op","prefetch_access");
+ fd = open(n.fd.c_str(), O_RDONLY);
break;
}
}
{
inc_metric("fdcache_op_count","op","emerg-flush");
obatched(clog) << "fdcache emergency flush for filling tmpdir";
- this->limit(0, 0); // emergency flush
+ this->limit(0, 0, 0, 0); // emergency flush
}
else if (fd >= 0)
- this->limit(max_fds, max_mbs); // age cache if required
+ this->limit(max_fds, max_mbs, max_prefetch_fds, max_prefetch_mbs); // age cache if required
return fd;
}
return true;
}
}
+ for (auto i = prefetch.begin(); i < prefetch.end(); i++)
+ {
+ if (i->archive == a && i->entry == b)
+ {
+ inc_metric("fdcache_op_count","op","prefetch_probe_hit");
+ return true;
+ }
+ }
inc_metric("fdcache_op_count","op","probe_miss");
return false;
}
for (auto i = lru.begin(); i < lru.end(); i++)
{
if (i->archive == a && i->entry == b)
- { // found it; move it to head of lru
+ { // found it; erase it from lru
fdcache_entry n = *i;
lru.erase(i); // invalidates i, so no more iteration!
inc_metric("fdcache_op_count","op","clear");
return;
}
}
+ for (auto i = prefetch.begin(); i < prefetch.end(); i++)
+ {
+ if (i->archive == a && i->entry == b)
+ { // found it; erase it from lru
+ fdcache_entry n = *i;
+ prefetch.erase(i); // invalidates i, so no more iteration!
+ inc_metric("fdcache_op_count","op","prefetch_clear");
+ unlink (n.fd.c_str());
+ set_metrics();
+ return;
+ }
+ }
}
-
- void limit(long maxfds, long maxmbs, bool metrics_p = true)
+ void limit(long maxfds, long maxmbs, long maxprefetchfds, long maxprefetchmbs , bool metrics_p = true)
{
if (verbose > 3 && (this->max_fds != maxfds || this->max_mbs != maxmbs))
obatched(clog) << "fdcache limited to maxfds=" << maxfds << " maxmbs=" << maxmbs << endl;
unique_lock<mutex> lock(fdcache_lock);
this->max_fds = maxfds;
this->max_mbs = maxmbs;
-
+ this->max_prefetch_fds = maxprefetchfds;
+ this->max_prefetch_mbs = maxprefetchmbs;
long total_fd = 0;
double total_mb = 0.0;
for (auto i = lru.begin(); i < lru.end(); i++)
// accumulate totals from most recently used one going backward
total_fd ++;
total_mb += i->fd_size_mb;
- if (total_fd > max_fds || total_mb > max_mbs)
+ if (total_fd > this->max_fds || total_mb > this->max_mbs)
{
// found the cut here point!
break;
}
}
+ total_fd = 0;
+ total_mb = 0.0;
+ for(auto i = prefetch.begin(); i < prefetch.end(); i++){
+ // accumulate totals from most recently used one going backward
+ total_fd ++;
+ total_mb += i->fd_size_mb;
+ if (total_fd > this->max_prefetch_fds || total_mb > this->max_prefetch_mbs)
+ {
+ // found the cut here point!
+ for (auto j = i; j < prefetch.end(); j++) // close all the fds from here on in
+ {
+ if (verbose > 3)
+ obatched(clog) << "fdcache evicted from prefetch a=" << j->archive << " b=" << j->entry
+ << " fd=" << j->fd << " mb=" << j->fd_size_mb << endl;
+ if (metrics_p)
+ inc_metric("fdcache_op_count","op","prefetch_evict");
+ unlink (j->fd.c_str());
+ }
+
+ prefetch.erase(i, prefetch.end()); // erase the nodes generally
+ break;
+ }
+ }
if (metrics_p) set_metrics();
}
{
// unlink any fdcache entries in $TMPDIR
// don't update metrics; those globals may be already destroyed
- limit(0, 0, false);
+ limit(0, 0, 0, 0, false);
}
};
static libarchive_fdcache fdcache;
// responsible for unlinking it later.
fdcache.intern(b_source0, fn,
tmppath, archive_entry_size(e),
- false); // prefetched ones go to back of lru
+ false); // prefetched ones go to the prefetch cache
prefetch_count --;
close (fd); // we're not saving this fd to make a mhd-response from!
continue;
sqlite3_db_release_memory(dbq); // ... for both connections
debuginfod_pool_groom(); // and release any debuginfod_client objects we've been holding onto
- fdcache.limit(0,0); // release the fdcache contents
- fdcache.limit(fdcache_fds,fdcache_mbs); // restore status quo parameters
+ fdcache.limit(0,0,0,0); // release the fdcache contents
+ fdcache.limit(fdcache_fds, fdcache_mbs, fdcache_prefetch_fds, fdcache_prefetch_mbs); // restore status quo parameters
clock_gettime (CLOCK_MONOTONIC, &ts_end);
double deltas = (ts_end.tv_sec - ts_start.tv_sec) + (ts_end.tv_nsec - ts_start.tv_nsec)/1.e9;
if (scan_archives.size()==0 && !scan_files && source_paths.size()>0)
obatched(clog) << "warning: without -F -R -U -Z, ignoring PATHs" << endl;
- fdcache.limit(fdcache_fds, fdcache_mbs);
+ fdcache.limit(fdcache_fds, fdcache_mbs, fdcache_prefetch_fds, fdcache_prefetch_mbs);
(void) signal (SIGPIPE, SIG_IGN); // microhttpd can generate it incidentally, ignore
(void) signal (SIGINT, signal_handler); // ^C
obatched(clog) << "fdcache tmpdir " << tmpdir << endl;
obatched(clog) << "fdcache tmpdir min% " << fdcache_mintmp << endl;
obatched(clog) << "groom time " << groom_s << endl;
+ obatched(clog) << "prefetch fds " << fdcache_prefetch_fds << endl;
+ obatched(clog) << "prefetch mbs " << fdcache_prefetch_mbs << endl;
+
if (scan_archives.size()>0)
{
obatched ob(clog);