void
faxQueueApp::processJob(Job& job, FaxRequest* req, DestInfo& di)
{
- JobStatus status;
+ job.commid = ""; // set on return
+ di.active(job);
FaxMachineInfo& info = di.getInfo(job.dest);
-
- Job* bjob = job.bfirst(); // first job in batch
- Job* cjob = &job; // current job
- FaxRequest* creq = req; // current request
- Job* njob = NULL; // next job
-
- for (; cjob != NULL; cjob = njob) {
- creq = cjob->breq;
- njob = cjob->bnext;
- cjob->commid = ""; // set on return
- di.active(*cjob);
- setActive(*cjob); // place job on active list
- updateRequest(*creq, *cjob);
- if (!prepareJobNeeded(*cjob, *creq, status)) {
- if (status != Job::done) {
- if (cjob->bprev == NULL)
- bjob = njob;
- cjob->state = FaxRequest::state_failed;
- deleteRequest(*cjob, creq, status, true);
- setDead(*cjob);
- }
- } else {
- if (prepareJobStart(*cjob, creq, info))
- return;
- else if (cjob->bprev == NULL)
- bjob = njob;
- }
- }
- if (bjob != NULL)
- sendJobStart(*bjob, bjob->breq);
+ JobStatus status;
+ setActive(job); // place job on active list
+ updateRequest(*req, job);
+ if (!prepareJobNeeded(job, *req, status)) {
+ if (status != Job::done) {
+ job.state = FaxRequest::state_failed;
+ deleteRequest(job, req, status, true);
+ setDead(job);
+ } else
+ sendJobStart(job, req);
+ } else
+ prepareJobStart(job, req, info);
}
/*
* server thread at which point the transmit work is actually
* initiated.
*/
-bool
+void
faxQueueApp::prepareJobStart(Job& job, FaxRequest* req,
FaxMachineInfo& info)
{
delayJob(job, *req, "Could not fork to prepare job for transmission",
Sys::now() + random() % requeueInterval);
delete req;
- return false;
+ break;
default: // parent, setup handler to wait
job.startPrepare(pid);
delete req; // must reread after preparation
- job.breq = NULL;
Trigger::post(Trigger::JOB_PREP_BEGIN, job);
- return true;
+ break;
}
}
if (job.suspendPending) { // co-thread waiting
job.suspendPending = false;
releaseModem(job);
- } else {
- FaxRequest* req = readRequest(job);
- if (!req) {
- // NB: no way to notify the user (XXX)
- logError("JOB %s: qfile vanished during preparation",
- (const char*) job.jobid);
- setDead(job);
- } else {
- bool processnext = false;
- bool startsendjob = false;
- Job* targetjob = &job;
- if (status == Job::done) { // preparation completed successfully
- job.breq = req;
- startsendjob = (job.bnext == NULL);
- processnext = !startsendjob;
- if (processnext) {
- targetjob = job.bnext;
- }
- } else {
- /*
- * Job preparation did not complete successfully.
- *
- * If there is more than one job in this batch, then remove this job
- * and adjust the batch accordingly.
- */
- if (job.bnext == NULL) { // no jobs left in batch
- targetjob = job.bprev;
- startsendjob = (targetjob != NULL);
- } else { // more jobs in batch
- targetjob = job.bnext;
- processnext = true;
- }
- if (status == Job::requeued) {
- job.remove();
- delayJob(job, *req, "Cannot fork to prepare job for transmission",
- Sys::now() + random() % requeueInterval);
- delete req;
- } else {
- deleteRequest(job, req, status, true);
- setDead(job);
- }
- }
- if (processnext)
- processJob(*targetjob, targetjob->breq, destJobs[targetjob->dest]);
- else if (startsendjob)
- sendJobStart(*targetjob->bfirst(), targetjob->bfirst()->breq);
- else {
- /*
- * This destination was marked as called, but all jobs to this
- * destination failed preparation, so we must undo the call marking.
- */
- removeDestInfoJob(job); // release destination block
- DestInfo& di = destJobs[job.dest];
- unblockDestJobs(di); // release any blocked jobs
- }
+ return;
+ }
+ FaxRequest* req = readRequest(job);
+ if (!req) {
+ // NB: no way to notify the user (XXX)
+ logError("JOB %s: qfile vanished during preparation",
+ (const char*) job.jobid);
+ setDead(job);
+ } else {
+ switch (status) {
+ case Job::requeued: // couldn't fork RIP
+ delayJob(job, *req, "Cannot fork to prepare job for transmission",
+ Sys::now() + random() % requeueInterval);
+ delete req;
+ break;
+ case (Job::done): // preparation completed successfully
+ sendJobStart(job, req);
+ break;
+ default:
+ deleteRequest(job, req, status, true);
+ setDead(job);
+ break;
}
}
}
void
faxQueueApp::sendJobStart(Job& job, FaxRequest* req)
{
- Job* cjob;
- int nfiles = 1;
-
job.start = Sys::now(); // start of transmission
- fxStr files = job.file;
- for (cjob = job.bnext; cjob != NULL; cjob = cjob->bnext) {
- files = files | " " | cjob->file;
- cjob->start = job.start;
- // XXX start deadman timeout on active jobs
- nfiles++;
- }
-
+ // XXX start deadman timeout on active jobs
const fxStr& cmd = pickCmd(*req);
fxStr dargs(job.getJCI().getArgs());
pid_t pid = fork();
switch (pid) {
case 0: // child, startup command
closeAllBut(-1); // NB: close 'em all
- doexec(cmd, dargs, job.modem->getDeviceID(), files, nfiles);
+ doexec(cmd, dargs, job.modem->getDeviceID(), job.file, 1);
sleep(10); // XXX give parent time to catch signal
_exit(127);
/*NOTREACHED*/
* If it appears that the we're doing this a lot,
* then lengthen the backoff.
*/
- Job* njob;
- for (cjob = &job; cjob != NULL; cjob = njob) {
- njob = cjob->bnext;
- req = cjob->breq;
- cjob->remove(); // Remove from active queue
- delayJob(*cjob, *req, "Could not fork to start job transmission",
- cjob->start + random() % requeueInterval);
- delete req;
- }
+ job.remove();
+ delayJob(job, *req, "Could not fork to start job transmission",
+ job.start + random() % requeueInterval);
break;
default: // parent, setup handler to wait
// joinargs puts a leading space so this looks funny here
traceQueue(job, "CMD START%s -m %s %s (PID %lu)"
, (const char*) joinargs(cmd, dargs)
, (const char*) job.modem->getDeviceID()
- , (const char*) files
+ , (const char*) job.file
, pid
);
job.startSend(pid);
- for (cjob = &job; cjob != NULL; cjob = njob) {
- cjob->pid = pid;
- njob = cjob->bnext;
- Trigger::post(Trigger::SEND_BEGIN, *cjob);
- delete cjob->breq; // discard handle (NB: releases lock)
- cjob->breq = NULL;
- }
+ Trigger::post(Trigger::SEND_BEGIN, job);
break;
}
+ delete req;
}
void
faxQueueApp::sendJobDone(Job& job, int status)
-{
- traceQueue(job, "CMD DONE: exit status %#x", status);
- if (status&0xff)
- logError("Send program terminated abnormally with exit status %#x", status);
-
- Job* cjob;
- Job* njob;
- DestInfo& di = destJobs[job.dest];
- di.hangup(); // do before unblockDestJobs
- releaseModem(job); // done with modem
- FaxRequest* req = readRequest(job);
- if (req && req->status == send_retry) {
- // prevent turnaround-redialing, delay any blocked jobs
- time_t newtts = req->tts;
- while ((cjob = di.nextBlocked())) {
- FaxRequest* blockedreq = readRequest(*cjob);
- if (blockedreq) {
- delayJob(*cjob, *blockedreq, "Delayed by prior call", newtts);
- delete blockedreq;
- }
- }
- } else {
- unblockDestJobs(di);
- }
-
- for (cjob = &job; cjob != NULL; cjob = njob) {
- njob = cjob->bnext;
- if (cjob != &job) req = readRequest(*cjob); // the first was already read
- if (!req) {
- time_t now = Sys::now();
- time_t duration = now - job.start;
- logError("JOB %s: SEND FINISHED: %s; but job file vanished",
- (const char*) cjob->jobid, fmtTime(duration));
- setDead(*cjob);
- continue;
- }
- sendJobDone(*cjob, req);
- }
-}
-
-void
-faxQueueApp::sendJobDone(Job& job, FaxRequest* req)
{
time_t now = Sys::now();
time_t duration = now - job.start;
+ traceQueue(job, "CMD DONE: exit status %#x", status);
Trigger::post(Trigger::SEND_END, job);
- job.bnext = NULL; job.bprev = NULL; // clear any batching
+ releaseModem(job); // done with modem
+ FaxRequest* req = readRequest(job); // reread the qfile
+ if (!req) {
+ logError("JOB %s: SEND FINISHED: %s; but job file vanished",
+ (const char*) job.jobid, fmtTime(duration));
+ setDead(job);
+ return;
+ }
job.commid = req->commid; // passed from subprocess
- if (req->status == 127) {
+ if (status & 0xFF) {
+ req->notice = fxStr::format(
+ "Send program terminated abnormally with exit status %#x", status);
+ req->status = send_failed;
+ logError("JOB %s: %s", (const char*)job.jobid, (const char*) req->notice);
+ } else if ((status >>= 8) == 127) {
req->notice = "Send program terminated abnormally; unable to exec " |
pickCmd(*req);
req->status = send_failed;
logError("JOB %s: %s",
(const char*)job.jobid, (const char*)req->notice);
- }
+ } else
+ req->status = (FaxSendStatus) status;
if (req->status == send_reformat) {
/*
* Job requires reformatting to deal with the discovery
} else if (req->useccover &&
req->npages > 0 && contCoverPageTemplate != "") {
/*
- * Setup to generate a cover page when the job is
+ * At least one page was sent so any existing
+ * cover page is certain to be gone. Setup
+ * to generate a cover page when the job is
* retried. Note that we assume the continuation
* cover page will be PostScript (though the
* type is not used anywhere just now).
void
faxQueueApp::ctrlJobDone(Job& job, int status)
{
- traceQueue(job, "CMD DONE: exit status %#x", status);
+ if (jobCtrlCmd.length() )
+ traceQueue(job, "CMD DONE: exit status %#x", status);
if (status) {
logError("JOB %s: bad exit status %#x from sub-fork",
(const char*) job.jobid, status);
job.startTTSTimer(tts);
}
+#define isOKToCall(di, jci, n) \
+ (di.getActive()+n <= jci.getMaxConcurrentCalls())
+
/*
* Process a job that's finished. The corpse gets placed
* on the deadq and is reaped the next time the scheduler
job.suspendPending = false;
traceJob(job, "DEAD");
Trigger::post(Trigger::JOB_DEAD, job);
- removeDestInfoJob(job);
+ DestInfo& di = destJobs[job.dest];
+ di.done(job); // remove from active destination list
+ di.updateConfig(); // update file if something changed
+ if (! di.isEmpty()) {
+ Job* jb;
+ u_int n = 1;
+ while ( (jb = di.nextBlocked()) ) {
+ if ( isOKToCall(di, jb->getJCI(), n) )
+ {
+ n++;
+ FaxRequest* req = readRequest(*jb);
+ if (req) {
+ req->notice = "";
+ updateRequest(*req, *jb);
+ delete req;
+ }
+ setReadyToRun(*jb, jobCtrlWait);
+ } else
+ {
+ traceJob(*jb, "Continue BLOCK, current calls: %d, max concurrent calls: %d",
+ di.getActive(), jb->getJCI().getMaxConcurrentCalls());
+ di.block(*jb);
+ break;
+ }
+ }
+ } else {
+ /*
+ * This is the last job to the destination; purge
+ * the entry from the destination jobs database.
+ */
+ destJobs.remove(job.dest);
+ }
+
if (job.isOnList()) // lazy remove from active list
job.remove();
job.insert(*deadq.next); // setup job corpus for reaping
destJobs[job.dest].unblock(job);
break;
}
-
- /*
- * We must remove any DestInfo stuff this is recorded in
- * When the job is resubmitted (or killed), we don't know
- * when (could be hours/never), or even if the dest number
- * will be the same
- */
- removeDestInfoJob(job);
job.remove(); // remove from old queue
job.stopKillTimer(); // clear kill timer
return (true);
}
}
-/*
- * Process the DestInfo job-block list
- * for this job. If the job is active and blocking other
- * jobs, we need to unblock...
- */
-#define isOKToCall(di, dci, n) \
- (di.getCalls()+n <= dci.getMaxConcurrentCalls())
-
-void
-faxQueueApp::unblockDestJobs(DestInfo& di)
-{
- /*
- * Check if there are blocked jobs waiting to run
- * and that there is now room to run one. If so,
- * take jobs off the blocked queue and make them
- * ready for processing.
- */
- Job* jb;
- u_int n = 1;
- while ( (jb = di.nextBlocked()) ) {
- if ( isOKToCall(di, jb->getJCI(), n) )
- {
- if (!di.supportsBatching()) n++;
- FaxRequest* req = readRequest(*jb);
- if (! req) {
- setDead(*jb);
- continue;
- }
- req->notice = "";
- updateRequest(*req, *jb);
- delete req;
- setReadyToRun(*jb, jobCtrlWait);
- } else
- {
- traceJob(*jb, "Continue BLOCK, current calls: %d, max concurrent calls: %d",
- di.getCalls(), jb->getJCI().getMaxConcurrentCalls());
- di.block(*jb);
- break;
- }
- }
-}
-
-void
-faxQueueApp::removeDestInfoJob(Job& job)
-{
- DestInfo& di = destJobs[job.dest];
- di.done(job); // remove from active destination list
- di.updateConfig(); // update file if something changed
- if (di.isEmpty()) {
- /*
- * This is the last job to the destination; purge
- * the entry from the destination jobs database.
- */
- destJobs.remove(job.dest);
- }
-}
-
-/*
- * Compare two job requests to each other and to a selected
- * job to see if they can be batched together.
- */
-bool
-faxQueueApp::areBatchable(Job& job, Job& nextjob, FaxRequest& nextreq)
-{
- // make sure the job's modem is in the requested ModemGroup
- if (!job.modem->isInGroup(nextreq.modem))
- return(false);
- return(true);
-}
-
-/*
- * Add new job "cjob" to the batched job "bjob"
- */
-void
-faxQueueApp::batchJob(Job* bjob, Job* cjob, FaxRequest* creq)
-{
- cjob->remove();
- cjob->modem = bjob->modem;
- cjob->breq = creq;
- cjob->bprev = bjob;
- bjob->bnext = cjob;
-}
-
/*
* Scan the list of jobs and process those that are ready
* to go. Note that the scheduler should only ever be
for (u_int i = 0; i < NQHASH; i++) {
for (JobIter iter(runqs[i]); iter.notDone(); iter++) {
Job& job = iter;
- if (job.bprev != NULL) {
- /*
- * The batching sub-loop below already allocated this job to a batch.
- * Thus, this loop's copy of the run queue is incorrect.
- */
- pokeScheduler();
- break;
- }
fxAssert(job.tts <= Sys::now(), "Sleeping job on run queue");
fxAssert(job.modem == NULL, "Job on run queue holding modem");
}
time_t now = Sys::now();
time_t tts;
- if (!isOKToCall(di, job.getJCI(), 1)) {
+ if (!di.isActive(job) && !isOKToCall(di, job.getJCI(), 1)) {
/*
* This job would exceed the max number of concurrent
* calls that may be made to this destination. Put it
delete req;
} else if (assignModem(job)) {
job.remove(); // remove from run queue
- job.breq = req;
/*
* We have a modem and have assigned it to the
* job. The job is not on any list; processJob
* also assumed to take place asynchronously in
* the context of the job's processing.
*/
- (void) di.getInfo(job.dest); // must read file for supportsBatching
- FaxMachineInfo info;
- if (di.supportsBatching()
- && (req->jobtype == "facsimile"
- || (req->jobtype == "pager"
- && streq(info.getPagingProtocol(), "ixo")))) {
- // fax and IXO pages only for now
- /*
- * The destination supports batching. Continue down the queue
- * and build an array of all processable jobs to this destination
- * allowed on this modem which are not of a lesser priority than
- * jobs to other destinations.
- */
- unblockDestJobs(di);
-
- /*
- * Since job files are passed to the send program as command-line
- * parameters, our batch size is limited by that number of
- * parameters. 64 should be a portable number.
- */
- if (maxBatchJobs > 64) maxBatchJobs = 64;
-
- Job* bjob = &job; // Last batched Job
- Job* cjob = &job; // current Job
-
- u_int batchedjobs = 1;
- for (u_int j = 0; batchedjobs < maxBatchJobs && j < NQHASH; j++) {
- for (JobIter joblist(runqs[j]); batchedjobs < maxBatchJobs && joblist.notDone(); joblist++) {
- cjob = joblist;
- if (job.jobid == cjob->jobid)
- continue; // Skip the current job
- if (job.dest != cjob->dest)
- continue;
- fxAssert(cjob->tts <= Sys::now(), "Sleeping job on run queue");
- fxAssert(cjob->modem == NULL, "Job on run queue holding modem");
-
- FaxRequest* creq = readRequest(*cjob);
- if (!areBatchable(job, *cjob, *creq)) {
- delete creq;
- continue;
- }
-
- if (iter.notDone() && &iter.job() == bjob)
- iter++;
-
- traceJob(job, "ADDING JOB %s TO BATCH", (const char*)cjob->jobid);
- batchJob(bjob, cjob, creq);
- bjob = cjob;
- batchedjobs++;
- }
- }
-
- /*
- * Jobs that are on the sleep queue with state_sleeping
- * can be batched because the tts that the submitter requested
- * is known to have passed already. So we pull these jobs out
- * of the sleep queue and batch them directly.
- */
- for (JobIter sleepiter(sleepq); batchedjobs < maxBatchJobs && sleepiter.notDone(); sleepiter++) {
- cjob = sleepiter;
- if (cjob->dest != job.dest || cjob->state != FaxRequest::state_sleeping)
- continue;
- FaxRequest* creq = readRequest(*cjob); if (! (req && areBatchable(job, *cjob, *req) ) ) {
- delete creq;
- continue;
- }
- traceJob(job, "ADDING JOB %s TO BATCH", (const char*)cjob->jobid);
- cjob->stopTTSTimer();
- // This job was batched from sleeping, things have
- // changed; Update the queue file for onlookers.
- cjob->tts = now;
- cjob->state = FaxRequest::state_ready;
- creq->tts = now;
- updateRequest(*creq, *cjob);
- batchJob(bjob, cjob, creq);
- bjob = cjob;
- batchedjobs++;
- }
- bjob->bnext = NULL;
- } else
- job.bnext = NULL;
- di.call(); // mark as called to correctly block other jobs
processJob(job, req, di);
} else // leave job on run queue
delete req;
{
Trigger::post(Trigger::MODEM_RELEASE, *job.modem);
job.modem->release();
+ job.modem = NULL;
pokeScheduler();
- Job* cjob;
- for (cjob = &job; cjob != NULL; cjob = cjob->bnext) {
- fxAssert(cjob->modem != NULL, "No assigned modem to release");
- cjob->modem = NULL; // remove reference to modem
- }
}
/*