From: Alex Rousskov Date: Sun, 30 Jan 2011 23:16:22 +0000 (-0700) Subject: Added "disker" processes to be responsible for individual cache_dir I/O. X-Git-Tag: take01~15^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=095ec2b;p=thirdparty%2Fsquid.git Added "disker" processes to be responsible for individual cache_dir I/O. Determine kid process role based on the process name rather than kid ID. This allows the process to perform role-specific actions before (or while) squid.conf is parsed. --- diff --git a/src/cache_cf.cc b/src/cache_cf.cc index 4601feb536..1ec303cba9 100644 --- a/src/cache_cf.cc +++ b/src/cache_cf.cc @@ -1884,16 +1884,8 @@ find_fstype(char *type) static void parse_cachedir(SquidConfig::_cacheSwap * swap) { - // The workers option must preceed cache_dir for the IamWorkerProcess check - // below to work. TODO: Redo IamWorkerProcess to work w/o Config and remove - if (KidIdentifier > 1 && Config.workers == 1) { - debugs(3, DBG_CRITICAL, - "FATAL: cache_dir found before the workers option. Reorder."); - self_destruct(); - } - - // Among all processes, only workers may need and can handle cache_dir. - if (!IamWorkerProcess()) + // coordinator does not need to handle cache_dir. + if (IamCoordinatorProcess()) return; char *type_str; diff --git a/src/fs/rock/RockRebuild.cc b/src/fs/rock/RockRebuild.cc index 3074605446..624fe43465 100644 --- a/src/fs/rock/RockRebuild.cc +++ b/src/fs/rock/RockRebuild.cc @@ -34,7 +34,8 @@ Rock::Rebuild::~Rebuild() /// prepares and initiates entry loading sequence void Rock::Rebuild::start() { - debugs(47,2, HERE << sd->index); + debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index << + " from " << sd->filePath); fd = file_open(sd->filePath, O_RDONLY | O_BINARY); if (fd < 0) diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 7bad86b11a..207cd7e6fe 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -212,6 +212,10 @@ Rock::SwapDir::validateOptions() void Rock::SwapDir::rebuild() { + // in SMP mode, only the disker is responsible for populating the map + if (UsingSmp() && !IamDiskProcess()) + return; + ++StoreController::store_dirs_rebuilding; Rebuild *r = new Rebuild(this); r->start(); // will delete self when done diff --git a/src/ipc/Kid.cc b/src/ipc/Kid.cc index 1768d0e7af..e737692bc8 100644 --- a/src/ipc/Kid.cc +++ b/src/ipc/Kid.cc @@ -12,6 +12,8 @@ #include #endif +int TheProcessKind = pkOther; + Kid::Kid(): badFailures(0), pid(-1), diff --git a/src/ipc/Kid.h b/src/ipc/Kid.h index fbdf189dd3..e1e283c37e 100644 --- a/src/ipc/Kid.h +++ b/src/ipc/Kid.h @@ -82,4 +82,19 @@ private: status_type status; ///< exit status of a stopped kid }; + +// TODO: processes may not be kids; is there a better place to put this? + +/// process kinds +typedef enum { + pkOther = 0, ///< we do not know or do not care + pkCoordinator = 1, ///< manages all other kids + pkWorker = 2, ///< general-purpose worker bee + pkDisker = 4, ///< cache_dir manager +} ProcessKind; + +/// ProcessKind for the current process +extern int TheProcessKind; + + #endif /* SQUID_IPC_KID_H */ diff --git a/src/ipc/Kids.cc b/src/ipc/Kids.cc index 7e15ceb7d6..7e310a3712 100644 --- a/src/ipc/Kids.cc +++ b/src/ipc/Kids.cc @@ -7,6 +7,7 @@ #include "config.h" #include "ipc/Kids.h" +#include "protos.h" Kids TheKids; KidName TheKidName; @@ -16,26 +17,31 @@ Kids::Kids() } /// maintain n kids -void Kids::init(size_t n) +void Kids::init() { - assert(n > 0); - if (storage.size() > 0) storage.clean(); - storage.reserve(n); + storage.reserve(NumberOfKids()); char kid_name[32]; - // add Kid records for all n main strands - for (size_t i = 1; i <= n; ++i) { - snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i); + // add Kid records for all workers + for (int i = 0; i < Config.workers; ++i) { + snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)(storage.size()+1)); + storage.push_back(Kid(kid_name)); + } + + // add Kid records for all disk processes + // (XXX: some cache_dirs do not need this) + for (int i = 0; i < Config.cacheSwap.n_configured; ++i) { + snprintf(kid_name, sizeof(kid_name), "(squid-disk-%d)", (int)(storage.size()+1)); storage.push_back(Kid(kid_name)); } // if coordination is needed, add a Kid record for Coordinator - if (n > 1) { - snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1)); + if (storage.size() > 1) { + snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(storage.size()+1)); storage.push_back(Kid(kid_name)); } } diff --git a/src/ipc/Kids.h b/src/ipc/Kids.h index bfa3735a28..0a2e348c71 100644 --- a/src/ipc/Kids.h +++ b/src/ipc/Kids.h @@ -21,8 +21,8 @@ private: Kids& operator= (const Kids&); ///< not implemented public: - /// maintain n kids - void init(size_t n); + /// initialize all kid records based on Config + void init(); /// returns kid by pid Kid* find(pid_t pid); diff --git a/src/main.cc b/src/main.cc index 597f03d579..725515d182 100644 --- a/src/main.cc +++ b/src/main.cc @@ -646,8 +646,8 @@ serverConnectionsOpen(void) wccp2ConnectionOpen(); #endif } - // Coordinator does not start proxying services - if (!IamCoordinatorProcess()) { + // start various proxying services if we are responsible for them + if (IamWorkerProcess()) { clientOpenListenSockets(); icpConnectionsOpen(); #if USE_HTCP @@ -687,7 +687,7 @@ serverConnectionsClose(void) wccp2ConnectionClose(); #endif } - if (!IamCoordinatorProcess()) { + if (IamWorkerProcess()) { clientHttpConnectionsClose(); icpConnectionShutdown(); #if USE_HTCP @@ -977,6 +977,9 @@ mainInitialize(void) #endif debugs(1, 1, "Process ID " << getpid()); + + debugs(1, 1, "Process Roles:" << ProcessRoles()); + setSystemLimits(); debugs(1, 1, "With " << Squid_MaxFD << " file descriptors available"); @@ -1228,6 +1231,16 @@ ConfigureCurrentKid(const char *processName) const size_t nameLen = idStart - (processName + 1); assert(nameLen < sizeof(TheKidName)); xstrncpy(TheKidName, processName + 1, nameLen + 1); + if (!strcmp(TheKidName, "squid-coord")) + TheProcessKind = pkCoordinator; + else + if (!strcmp(TheKidName, "squid")) + TheProcessKind = pkWorker; + else + if (!strcmp(TheKidName, "squid-disk")) + TheProcessKind = pkDisker; + else + TheProcessKind = pkOther; // including coordinator } } else { xstrncpy(TheKidName, APP_SHORTNAME, sizeof(TheKidName)); @@ -1470,7 +1483,7 @@ SquidMain(int argc, char **argv) if (IamCoordinatorProcess()) AsyncJob::Start(Ipc::Coordinator::Instance()); - else if (UsingSmp() && IamWorkerProcess()) + else if (UsingSmp() && (IamWorkerProcess() || IamDiskProcess())) AsyncJob::Start(new Ipc::Strand); /* at this point we are finished the synchronous startup. */ @@ -1680,7 +1693,9 @@ watch_child(char *argv[]) Config.workers); // but we keep going in hope that user knows best } - TheKids.init(Config.workers); + TheKids.init(); + +syslog(LOG_NOTICE, "XXX: will start %d kids", TheKids.count()); // keep [re]starting kids until it is time to quit for (;;) { @@ -1702,7 +1717,8 @@ watch_child(char *argv[]) } kid.start(pid); - syslog(LOG_NOTICE, "Squid Parent: child process %d started", pid); + syslog(LOG_NOTICE, "Squid Parent: %s process %d started", + kid.name().termedBuf(), pid); } /* parent */ @@ -1726,14 +1742,17 @@ watch_child(char *argv[]) kid->stop(status); if (kid->calledExit()) { syslog(LOG_NOTICE, - "Squid Parent: child process %d exited with status %d", + "Squid Parent: %s process %d exited with status %d", + kid->name().termedBuf(), kid->getPid(), kid->exitStatus()); } else if (kid->signaled()) { syslog(LOG_NOTICE, - "Squid Parent: child process %d exited due to signal %d with status %d", + "Squid Parent: %s process %d exited due to signal %d with status %d", + kid->name().termedBuf(), kid->getPid(), kid->termSignal(), kid->exitStatus()); } else { - syslog(LOG_NOTICE, "Squid Parent: child process %d exited", kid->getPid()); + syslog(LOG_NOTICE, "Squid Parent: %s process %d exited", + kid->name().termedBuf(), kid->getPid()); } } else { syslog(LOG_NOTICE, "Squid Parent: unknown child process %d exited", pid); diff --git a/src/protos.h b/src/protos.h index 57ba76a433..cb534ad147 100644 --- a/src/protos.h +++ b/src/protos.h @@ -588,12 +588,16 @@ SQUIDCEXTERN bool IamPrimaryProcess(); SQUIDCEXTERN bool IamCoordinatorProcess(); /// whether the current process handles HTTP transactions and such SQUIDCEXTERN bool IamWorkerProcess(); +/// whether the current process is dedicated to managing a cache_dir +bool IamDiskProcess(); /// Whether we are running in daemon mode SQUIDCEXTERN bool InDaemonMode(); // try using specific Iam*() checks above first /// Whether there should be more than one worker process running SQUIDCEXTERN bool UsingSmp(); // try using specific Iam*() checks above first /// number of Kid processes as defined in src/ipc/Kid.h SQUIDCEXTERN int NumberOfKids(); +/// a string describing this process roles such as worker or coordinator +String ProcessRoles(); SQUIDCEXTERN int DebugSignal; /* AYJ debugs function to show locations being reset with memset() */ diff --git a/src/tools.cc b/src/tools.cc index 18729c9bdf..0cfe32a1ed 100644 --- a/src/tools.cc +++ b/src/tools.cc @@ -819,7 +819,13 @@ IamWorkerProcess() if (opt_no_daemon || Config.workers == 0) return true; - return 0 < KidIdentifier && KidIdentifier <= Config.workers; + return TheProcessKind == pkWorker; +} + +bool +IamDiskProcess() +{ + return TheProcessKind == pkDisker; } bool @@ -831,13 +837,13 @@ InDaemonMode() bool UsingSmp() { - return !opt_no_daemon && Config.workers > 1; + return InDaemonMode() && NumberOfKids() > 1; } bool IamCoordinatorProcess() { - return UsingSmp() && KidIdentifier == Config.workers + 1; + return TheProcessKind == pkCoordinator; } bool @@ -863,11 +869,28 @@ NumberOfKids() if (!InDaemonMode()) return 0; - // workers + the coordinator process - if (UsingSmp()) - return Config.workers + 1; + // XXX: detect and abort when called before workers/cache_dirs are parsed - return Config.workers; + // XXX: this is not always the case as there are other cache_dir types + const int rockDirs = Config.cacheSwap.n_configured; + + const bool needCoord = Config.workers > 1 || rockDirs > 0; + return (needCoord ? 1 : 0) + Config.workers + rockDirs; +} + +String +ProcessRoles() +{ + String roles = ""; + if (IamMasterProcess()) + roles.append(" master"); + if (IamCoordinatorProcess()) + roles.append(" coordinator"); + if (IamWorkerProcess()) + roles.append(" worker"); + if (IamDiskProcess()) + roles.append(" disker"); + return roles; } void