{ "fdcache-fds", ARGP_KEY_FDCACHE_FDS, "NUM", 0, "Maximum number of archive files to keep in fdcache.", 0 },
#define ARGP_KEY_FDCACHE_MBS 0x1002
{ "fdcache-mbs", ARGP_KEY_FDCACHE_MBS, "MB", 0, "Maximum total size of archive file fdcache.", 0 },
+#define ARGP_KEY_FDCACHE_PREFETCH 0x1003
+ { "fdcache-prefetch", ARGP_KEY_FDCACHE_PREFETCH, "NUM", 0, "Number of archive files to prefetch into fdcache.", 0 },
{ NULL, 0, NULL, 0, NULL, 0 }
};
static bool traverse_logical;
static long fdcache_fds;
static long fdcache_mbs;
+static long fdcache_prefetch;
static string tmpdir;
static void set_metric(const string& key, int64_t value);
case ARGP_KEY_FDCACHE_MBS:
fdcache_mbs = atol (arg);
break;
+ case ARGP_KEY_FDCACHE_PREFETCH:
+ fdcache_prefetch = atol (arg);
+ break;
case ARGP_KEY_ARG:
source_paths.insert(string(arg));
break;
string archive;
string entry;
string fd;
- long fd_size_mb; // rounded up megabytes
+ double fd_size_mb; // slightly rounded up megabytes
};
deque<fdcache_entry> lru; // @head: most recently used
long max_fds;
long max_mbs;
public:
- void intern(const string& a, const string& b, string fd, off_t sz)
+ void intern(const string& a, const string& b, string fd, off_t sz, bool front_p)
{
{
unique_lock<mutex> lock(fdcache_lock);
break; // must not continue iterating
}
}
- long mb = ((sz+1023)/1024+1023)/1024;
+ double mb = (sz+65535)/1048576.0; // round up to 64K block
fdcache_entry n = { a, b, fd, mb };
- lru.push_front(n);
+ if (front_p)
+ lru.push_front(n);
+ else
+ lru.push_back(n);
if (verbose > 3)
- obatched(clog) << "fdcache interned a=" << a << " b=" << b << " fd=" << fd << " mb=" << mb << endl;
+ obatched(clog) << "fdcache interned a=" << a << " b=" << b
+ << " fd=" << fd << " mb=" << mb << " front=" << front_p << endl;
}
- this->limit(max_fds, max_mbs); // age cache if required
+ // NB: we age the cache at lookup time too
+ if (front_p)
+ this->limit(max_fds, max_mbs); // age cache if required
}
int lookup(const string& a, const string& b)
+ {
+ int fd = -1;
+ {
+ unique_lock<mutex> lock(fdcache_lock);
+ for (auto i = lru.begin(); i < lru.end(); i++)
+ {
+ if (i->archive == a && i->entry == b)
+ { // found it; move it to head of lru
+ fdcache_entry n = *i;
+ lru.erase(i); // invalidates i, so no more iteration!
+ lru.push_front(n);
+
+ fd = open(n.fd.c_str(), O_RDONLY); // NB: no problem if dup() fails; looks like cache miss
+ break;
+ }
+ }
+ }
+
+ if (fd >= 0)
+ this->limit(max_fds, max_mbs); // age cache if required
+
+ return fd;
+ }
+
+ int probe(const string& a, const string& b) // just a cache residency check - don't modify LRU state, don't open
{
unique_lock<mutex> lock(fdcache_lock);
for (auto i = lru.begin(); i < lru.end(); i++)
{
if (i->archive == a && i->entry == b)
- { // found it; move it to head of lru
- fdcache_entry n = *i;
- lru.erase(i); // invalidates i, so no more iteration!
- lru.push_front(n);
-
- return open(n.fd.c_str(), O_RDONLY); // NB: no problem if dup() fails; looks like cache miss
- }
+ return true;
}
- return -1;
+ return false;
}
void clear(const string& a, const string& b)
this->max_mbs = maxmbs;
long total_fd = 0;
- long total_mb = 0;
+ double total_mb = 0.0;
for (auto i = lru.begin(); i < lru.end(); i++)
{
// accumulate totals from most recently used one going backward
return 0;
}
+ // check for a match in the fdcache first
int fd = fdcache.lookup(b_source0, b_source1);
while (fd >= 0) // got one!; NB: this is really an if() with a possible branch out to the end
{
// NB: see, we never go around the 'loop' more than once
}
+ // no match ... grumble, must process the archive
string archive_decoder = "/dev/null";
string archive_extension = "";
for (auto&& arch : scan_archives)
if (rc != ARCHIVE_OK)
throw archive_exception(a, "cannot open archive from pipe");
- while(1) // parse cpio archive entries
+ // archive traversal is in three stages, no, four stages:
+ // 1) skip entries whose names do not match the requested one
+ // 2) extract the matching entry name (set r = result)
+ // 3) extract some number of prefetched entries (just into fdcache)
+ // 4) abort any further processing
+ struct MHD_Response* r = 0; // will set in stage 2
+ unsigned prefetch_count = fdcache_prefetch; // will decrement in stage 3
+
+ while(r == 0 || prefetch_count > 0) // stage 1, 2, or 3
{
+ if (interrupted)
+ break;
+
struct archive_entry *e;
rc = archive_read_next_header (a, &e);
if (rc != ARCHIVE_OK)
continue;
string fn = canonicalized_archive_entry_pathname (e);
- if (fn != b_source1)
+ if ((r == 0) && (fn != b_source1)) // stage 1
+ continue;
+
+ if (fdcache.probe (b_source0, fn)) // skip if already interned
continue;
// extract this file to a temporary file
throw archive_exception(a, "cannot extract file");
}
+ if (r != 0) // stage 3
+ {
+ // NB: now we know we have a complete reusable file; make fdcache
+ // responsible for unlinking it later.
+ fdcache.intern(b_source0, fn,
+ tmppath, archive_entry_size(e),
+ false); // prefetched ones go to back of lru
+ prefetch_count --;
+ close (fd); // we're not saving this fd to make a mhd-response from!
+ continue;
+ }
+
// NB: now we know we have a complete reusable file; make fdcache
// responsible for unlinking it later.
- fdcache.intern(b_source0, b_source1, tmppath, archive_entry_size(e));
+ fdcache.intern(b_source0, b_source1,
+ tmppath, archive_entry_size(e),
+ true); // requested ones go to the front of lru
inc_metric ("http_responses_total","result",archive_extension + " archive");
- struct MHD_Response* r = MHD_create_response_from_fd (archive_entry_size(e), fd);
+ r = MHD_create_response_from_fd (archive_entry_size(e), fd);
if (r == 0)
{
if (verbose)
obatched(clog) << "cannot create fd-response for " << b_source0 << endl;
close(fd);
- break; // assume no chance of better luck around another iteration
+ break; // assume no chance of better luck around another iteration; no other copies of same file
}
else
{
/* libmicrohttpd will close it. */
if (result_fd)
*result_fd = fd;
- return r;
+ continue;
}
}
// XXX: rpm/file not found: delete this R entry?
- return 0;
+ return r;
}
fdcache_mbs = 1024; // 1 gigabyte
else
fdcache_mbs = sfs.f_bavail * sfs.f_bsize / 1024 / 1024 / 4; // 25% of free space
- fdcache_fds = concurrency * 2;
+ fdcache_prefetch = 64; // guesstimate storage is this much less costly than re-decompression
+ fdcache_fds = (concurrency + fdcache_prefetch) * 2;
/* Parse and process arguments. */
int remaining;
obatched(clog) << "rescan time " << rescan_s << endl;
obatched(clog) << "fdcache fds " << fdcache_fds << endl;
obatched(clog) << "fdcache mbs " << fdcache_mbs << endl;
+ obatched(clog) << "fdcache prefetch " << fdcache_prefetch << endl;
obatched(clog) << "fdcache tmpdir " << tmpdir << endl;
obatched(clog) << "groom time " << groom_s << endl;
if (scan_archives.size()>0)