bool work = true;
set_jcr_in_tsd(INVALID_JCR);
- Dmsg0(2300, "Start jobq_server\n");
+ Dmsg0(DT_SCHEDULER|50, "Start jobq_server\n");
P(jq->mutex);
for (;;) {
struct timeval tv;
struct timezone tz;
- Dmsg0(2300, "Top of for loop\n");
+ Dmsg0(DT_SCHEDULER|50, "Top of for loop\n");
if (!work && !jq->quit) {
gettimeofday(&tv, &tz);
timeout.tv_nsec = 0;
/*
* Wait 4 seconds, then if no more work, exit
*/
- Dmsg0(2300, "pthread_cond_timedwait()\n");
+ Dmsg0(DT_SCHEDULER|50, "pthread_cond_timedwait()\n");
stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
if (stat == ETIMEDOUT) {
Dmsg0(2300, "timedwait timedout.\n");
break;
} else if (stat != 0) {
/* This shouldn't happen */
- Dmsg0(2300, "This shouldn't happen\n");
+ Dmsg0(DT_SCHEDULER|50, "This shouldn't happen\n");
jq->num_workers--;
V(jq->mutex);
return NULL;
/*
* If anything is in the ready queue, run it
*/
- Dmsg0(2300, "Checking ready queue.\n");
+ Dmsg0(DT_SCHEDULER|50, "Checking ready queue.\n");
while (!jq->ready_jobs->empty() && !jq->quit) {
JCR *jcr;
je = (jobq_item_t *)jq->ready_jobs->first();
jcr = je->jcr;
jq->ready_jobs->remove(je);
if (!jq->ready_jobs->empty()) {
- Dmsg0(2300, "ready queue not empty start server\n");
+ Dmsg0(DT_SCHEDULER|50, "ready queue not empty start server\n");
if (start_server(jq) != 0) {
jq->num_workers--;
V(jq->mutex);
jcr->my_thread_id = pthread_self();
jcr->set_killable(true);
set_jcr_in_tsd(jcr);
- Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
+ Dmsg1(DT_SCHEDULER|50, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
/* Release job queue lock */
V(jq->mutex);
/* Call user's routine here */
- Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
+ Dmsg3(DT_SCHEDULER|50, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
jcr->use_count(), jcr->JobStatus);
jq->engine(je->jcr);
remove_jcr_from_tsd(je->jcr);
je->jcr->set_killable(false);
- Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
+ Dmsg2(DT_SCHEDULER|50, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
jcr->use_count());
/* Reacquire job queue lock */
P(jq->mutex);
- Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
+ Dmsg0(DT_SCHEDULER|50, "Done lock mutex after running job. Release locks.\n");
jq->running_jobs->remove(je);
/*
* Release locks if acquired. Note, they will not have
}
/* Clean up and release old jcr */
- Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
+ Dmsg2(DT_SCHEDULER|50, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
jcr->SDJobStatus = 0;
V(jq->mutex); /* release internal lock */
free_jcr(jcr);
* If any job in the wait queue can be run,
* move it to the ready queue
*/
- Dmsg0(2300, "Done check ready, now check wait queue.\n");
+ Dmsg0(DT_SCHEDULER|50, "Done check ready, now check wait queue.\n");
if (!jq->waiting_jobs->empty() && !jq->quit) {
int Priority;
bool running_allow_mix = false;
jobq_item_t *re = (jobq_item_t *)jq->running_jobs->first();
if (re) {
Priority = re->jcr->JobPriority;
- Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
+ Dmsg2(DT_SCHEDULER|50, "JobId %d is running. Look for pri=%d\n",
re->jcr->JobId, Priority);
running_allow_mix = true;
for ( ; re; ) {
- Dmsg2(2300, "JobId %d is also running with %s\n",
+ Dmsg2(DT_SCHEDULER|50, "JobId %d is also running with %s\n",
re->jcr->JobId,
re->jcr->job->allow_mixed_priority ? "mix" : "no mix");
if (!re->jcr->job->allow_mixed_priority) {
}
re = (jobq_item_t *)jq->running_jobs->next(re);
}
- Dmsg1(2300, "The running job(s) %s mixing priorities.\n",
+ Dmsg1(DT_SCHEDULER|50, "The running job(s) %s mixing priorities.\n",
running_allow_mix ? "allow" : "don't allow");
} else {
Priority = je->jcr->JobPriority;
- Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
+ Dmsg1(DT_SCHEDULER|50, "No job running. Look for Job pri=%d\n", Priority);
}
/*
* Walk down the list of waiting jobs and attempt
/* je is current job item on the queue, jn is the next one */
JCR *jcr = je->jcr;
jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
-
- Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
+ btime_t now = time(NULL);
+ int interval = 90;
+ /* When we debug, we can spend less time to wait */
+ if (chk_dbglvl(DT_SCHEDULER)) {
+ interval = 10;
+ }
+ if (jcr->next_qrunscript_execution >= 0 && // first time
+ jcr->next_qrunscript_execution <= now) // we test again after some time
+ {
+ /* We test every 90s */
+ jcr->next_qrunscript_execution = now + interval;
+ int runcode = run_scripts_get_code(jcr, jcr->job->RunScripts, NT_("Queued"));
+ /* We use the exit code of the runscripts to determine what to do now.
+ * we can wait, start the job, or do other things
+ */
+ switch (runcode) {
+ case -1: // No script to run
+ jcr->next_qrunscript_execution = -1;
+ break;
+ case 0:
+ Jmsg(jcr, M_INFO, 0, _("User defined resources are available.\n"));
+ break;
+ case 1:
+ if (jcr->getJobStatus() != JS_WaitUser) {
+ Jmsg(jcr, M_INFO, 0, _("The Job will wait for user defined resources to be available.\n"));
+ }
+ jcr->setJobStatus(JS_WaitUser);
+ if (!job_canceled(jcr)) {
+ je = jn; /* point to next waiting job */
+ continue;
+ }
+ break;
+ case 2: // skip priority checks
+ jcr->JobPriority = 9999;
+ Jmsg(jcr, M_INFO, 0, _("Job Priority adjusted.\n"));
+ break;
+ default: // Incorrect code, must review the script
+ jcr->next_qrunscript_execution = -1;
+ Jmsg(jcr, M_WARNING, 0, _("Incorrect return code %d for user defined resource checking script.\n"), runcode);
+ break;
+ }
+ } else if (jcr->next_qrunscript_execution > 0) {
+ /* We have to wait, the job is not ready to be tested again */
+ if (!job_canceled(jcr)) {
+ je = jn; /* point to next waiting job */
+ continue;
+ }
+ }
+ Dmsg4(DT_SCHEDULER|50, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
jcr->JobId, jcr->JobPriority, Priority,
jcr->job->allow_mixed_priority ? "mix" : "no mix");
*/
jq->waiting_jobs->remove(je);
jq->ready_jobs->append(je);
- Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
+ Dmsg1(DT_SCHEDULER|50, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
je = jn; /* Point to next waiting job */
} /* end for loop */
} /* end if */
- Dmsg0(2300, "Done checking wait queue.\n");
+ Dmsg0(DT_SCHEDULER|50, "Done checking wait queue.\n");
/*
* If no more ready work and we are asked to quit, then do it
*/
if (jq->ready_jobs->empty() && jq->quit) {
jq->num_workers--;
if (jq->num_workers == 0) {
- Dmsg0(2300, "Wake up destroy routine\n");
+ Dmsg0(DT_SCHEDULER|50, "Wake up destroy routine\n");
/* Wake up destroy routine if he is waiting */
pthread_cond_broadcast(&jq->work);
}
break;
}
- Dmsg0(2300, "Check for work request\n");
+ Dmsg0(DT_SCHEDULER|50, "Check for work request\n");
/*
* If no more work requests, and we waited long enough, quit
*/
- Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
+ Dmsg2(DT_SCHEDULER|50, "timedout=%d read empty=%d\n", timedout,
jq->ready_jobs->empty());
if (jq->ready_jobs->empty() && timedout) {
- Dmsg0(2300, "break big loop\n");
+ Dmsg0(DT_SCHEDULER|50, "break big loop\n");
jq->num_workers--;
break;
}
/* Recompute work as something may have changed in last 2 secs */
work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
}
- Dmsg1(2300, "Loop again. work=%d\n", work);
+ Dmsg1(DT_SCHEDULER|50, "Loop again. work=%d\n", work);
} /* end of big for loop */
- Dmsg0(200, "unlock mutex\n");
+ Dmsg0(DT_SCHEDULER|50, "unlock mutex\n");
V(jq->mutex);
- Dmsg0(2300, "End jobq_server\n");
+ Dmsg0(DT_SCHEDULER|50, "End jobq_server\n");
return NULL;
}
jcr->sched_time = now + jcr->job->RescheduleInterval;
bstrftime(dt, sizeof(dt), now);
bstrftime(dt2, sizeof(dt2), jcr->sched_time);
- Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
+ Dmsg4(DT_SCHEDULER|50, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
(int)jcr->job->RescheduleInterval, now, jcr->sched_time);
Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
}
/* Only jobs with no output or Incomplete jobs can run on same JCR */
if (jcr->JobBytes == 0 || jcr->rerunning) {
- Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
+ Dmsg2(DT_SCHEDULER|50, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
V(jq->mutex);
/*
* Special test here since a Virtual Full gets marked
njcr->messages = jcr->messages;
njcr->spool_data = jcr->spool_data;
njcr->write_part_after_job = jcr->write_part_after_job;
- Dmsg0(2300, "Call to run new job\n");
+ Dmsg0(DT_SCHEDULER|50, "Call to run new job\n");
V(jq->mutex);
run_job(njcr); /* This creates a "new" job */
free_jcr(njcr); /* release "new" jcr */
P(jq->mutex);
- Dmsg0(2300, "Back from running new job.\n");
+ Dmsg0(DT_SCHEDULER|50, "Back from running new job.\n");
}
return false;
}
on_failure = false;
fail_on_error = true;
when = SCRIPT_Never;
+ wait_time = 0; /* Infinite by default */
old_proto = false; /* TODO: drop this with bacula 1.42 */
job_code_callback = NULL;
}
when = SCRIPT_AfterVSS;
} else if (bstrcmp(label, NT_("AtJobCompletion"))) {
when = SCRIPT_AtJobCompletion;
+ } else if (bstrcmp(label, NT_("Queued"))) {
+ when = SCRIPT_Queued;
} else {
when = SCRIPT_After;
}
return ret;
}
+
+/* Function similar to run_scripts() but we report the exit status of the script
+ * -1: no script to run
+ * 0: ok
+ * 1-255: exit code from the script
+ */
+int run_scripts_get_code(JCR *jcr, alist *runscripts, const char *label)
+{
+ Dmsg2(200, "runscript: running all RUNSCRIPT object (%s) JobStatus=%c\n", label, jcr->JobStatus);
+
+ RUNSCRIPT *script;
+ bool runit;
+ int ret = -1;
+ int when;
+
+ if (strstr(label, NT_("Queued"))) {
+ when = SCRIPT_Queued;
+ } else {
+ when = SCRIPT_Never;
+ }
+
+ if (runscripts == NULL) {
+ Dmsg0(100, "runscript: WARNING RUNSCRIPTS list is NULL\n");
+ return -1;
+ }
+
+ foreach_alist(script, runscripts) {
+ Dmsg2(200, "runscript: try to run %s:%s\n", NPRT(script->target), NPRT(script->command));
+ runit = false;
+
+ if ((script->when & SCRIPT_Queued) && (when & SCRIPT_Queued)) {
+ if (jcr->job_started == false) {
+ Dmsg4(200, "runscript: Run it because SCRIPT_Queued (%s,%i,%i,%c)\n",
+ script->command, script->on_success, script->on_failure,
+ jcr->JobStatus );
+ if (!script->wait_time) {
+ script->wait_time = 15; // Set a maximum of 15s to not block everything
+ }
+ /* Set job task code */
+ jcr->job_task = JOB_TASK_QUEUED;
+ runit = true;
+ }
+ }
+
+ if (!script->is_local()) {
+ runit = false;
+ }
+
+ /* we execute it */
+ if (runit) {
+ berrno be;
+ int aret = script->run_get_code(jcr, label);
+ ret = MAX(ret, be.code(aret)); // We return the one with the biggest return code
+ }
+ }
+
+ /* Script ended, reset operation code */
+ jcr->job_task = JOB_TASK_ZERO;
+ return ret;
+}
+
bool RUNSCRIPT::is_local()
{
if (!target || (strcmp(target, "") == 0)) {
pm_strcpy(target, client_name);
}
-bool RUNSCRIPT::run(JCR *jcr, const char *name)
+/*
+ * -1: execution problem
+ * 0..255 excution done
+ */
+int RUNSCRIPT::run_get_code(JCR *jcr, const char *name)
{
Dmsg1(100, "runscript: running a RUNSCRIPT object type=%d\n", cmd_type);
POOLMEM *ecmd = get_pool_memory(PM_FNAME);
- int status;
+ int status = -1;
BPIPE *bpipe;
char line[MAXSTRING];
switch (cmd_type) {
case SHELL_CMD:
- bpipe = open_bpipe(ecmd, 0, "r");
- if (bpipe == NULL) {
- berrno be;
- Jmsg(jcr, M_ERROR, 0, _("Runscript: %s could not execute. ERR=%s\n"), name,
- be.bstrerror());
- goto bail_out;
- }
- while (fgets(line, sizeof(line), bpipe->rfd)) {
- int len = strlen(line);
- if (len > 0 && line[len-1] == '\n') {
- line[len-1] = 0;
+ bpipe = open_bpipe(ecmd, wait_time, "r");
+ if (bpipe) {
+ while (fgets(line, sizeof(line), bpipe->rfd)) {
+ int len = strlen(line);
+ if (len > 0 && line[len-1] == '\n') {
+ line[len-1] = 0;
+ }
+ Jmsg(jcr, M_INFO, 0, _("%s: %s\n"), name, line);
}
- Jmsg(jcr, M_INFO, 0, _("%s: %s\n"), name, line);
+ status = close_bpipe(bpipe);
}
- status = close_bpipe(bpipe);
- if (status != 0) {
- berrno be;
- Jmsg(jcr, M_ERROR, 0, _("Runscript: %s returned non-zero status=%d. ERR=%s\n"), name,
- be.code(status), be.bstrerror(status));
- goto bail_out;
- }
- Dmsg0(100, "runscript OK\n");
break;
case CONSOLE_CMD:
if (console_command) { /* can we run console command? */
if (!console_command(jcr, ecmd)) { /* yes, do so */
- goto bail_out;
+ status = 0;
+ } else {
+ status = 1;
}
}
break;
}
+ Dmsg1(100, "runscript status=%d\n", status);
free_pool_memory(ecmd);
- return true;
+ return status;
+}
-bail_out:
- free_pool_memory(ecmd);
- /* cancel running job properly */
- if (fail_on_error) {
- jcr->setJobStatus(JS_ErrorTerminated);
+bool RUNSCRIPT::run(JCR *jcr, const char *name)
+{
+ int code = run_get_code(jcr, name);
+ if (code != 0) {
+ berrno be;
+ Jmsg(jcr, M_ERROR, 0, _("Runscript: %s returned non-zero status=%d. ERR=%s\n"), name,
+ be.code(code), be.bstrerror(code));
+
+ if (fail_on_error) {
+ jcr->setJobStatus(JS_ErrorTerminated);
+ }
}
- Dmsg1(100, "runscript failed. fail_on_error=%d\n", fail_on_error);
- return false;
+ return code == 0;
}
void free_runscripts(alist *runscripts)
Dmsg1(200, _(" --> RunOnFailure=%u\n"), on_failure);
Dmsg1(200, _(" --> FailJobOnError=%u\n"), fail_on_error);
Dmsg1(200, _(" --> RunWhen=%u\n"), when);
+ Dmsg1(200, _(" --> Timeout=%u\n"), wait_time);
}
void RUNSCRIPT::set_job_code_callback(job_code_callback_t arg_job_code_callback)