#include "bacula.h"
#include "stored.h"
-
+#include "dedupstored.h"
/* Forward referenced subroutines */
static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
return false;
}
dcr->dev->start_of_job(dcr);
+ dcr->dev->setup_dedup_rehydration_interface(dcr);
/* Tell File daemon we will send data */
if (!jcr->is_ok_data_sent) {
+ /* OK_DATA can have been already sent for copy/migrate by run_job() to avoid dead lock*/
+ Dmsg0(DT_DEDUP|215, "send OK_data\n");
+ if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+ jcr->dedup->warn_rehydration_eod();
+ return false;
+ }
fd->fsend(OK_data);
jcr->is_ok_data_sent = true;
}
job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
+ if (jcr->dedup) {
+ /* must be sure that FD queue is not full to receive the BNET_EOD below */
+ jcr->dedup->do_flowcontrol_rehydration(1);
+ /* tell the dedup rehydration thread that this is done and it can quit
+ * as soon it get the last ACK from the FD/SD
+ * This is only useful for SD-SD right now, because
+ * SD-FD use the BNET_CMD_STP_THREAD command
+ * This must be done before the BNET_EOD, because this is the
+ * matching BNET_CMD_REC_ACK that will allow to exit the loop
+ */
+ Dmsg0(DT_DEDUP|215, "warn about end of rehydration thread\n");
+ jcr->dedup->warn_rehydration_eod();
+ }
+
/* Send end of data to FD */
fd->signal(BNET_EOD);
+ dcr->dev->free_dedup_rehydration_interface(dcr);
+
if (!release_device(jcr->read_dcr)) {
ok = false;
}
return true;
}
+ /* Do rehydration */
+ if (rec->Stream & STREAM_BIT_DEDUPLICATION_DATA) {
+ if (jcr->dedup==NULL) { // aka dcr->dev->dev_type!=B_DEDUP_DEV
+ Jmsg0(jcr, M_FATAL, 0, _("Cannot do rehydration, device is not dedup aware\n"));
+ return false;
+ }
+ Dmsg2(DT_DEDUP|640, "stream 0x%x is_rehydration_srvside=%d\n", rec->Stream, jcr->dedup->is_rehydration_srvside());
+ if (jcr->dedup->is_rehydration_srvside()) {
+ wbuf = jcr->dedup->get_msgbuf();
+ bool despite_of_error = forge_on;
+ int size;
+ int err = jcr->dedup->record_rehydration(dcr, rec, wbuf, jcr->errmsg, despite_of_error, &size);
+ if (err) {
+ /* cannot read data from DDE */
+ if (!despite_of_error) {
+ Jmsg1(jcr, M_FATAL, 0, "%s", jcr->errmsg);
+ return false;
+ }
+ Jmsg1(jcr, M_ERROR, 0, "%s", jcr->errmsg);
+ }
+ wsize = size;
+ } else {
+ // if the FD will do dedup, then do flow control
+ if (!jcr->dedup->is_thread_started()) {
+ Dmsg0(DT_DEDUP|215, "Starting rehydration thread\n");
+ jcr->dedup->start_rehydration();
+ }
+ jcr->dedup->add_circular_buf(dcr, rec);
+ }
+ }
+
Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
rec->VolSessionId, rec->VolSessionTime,
FI_to_ascii(ec1, rec->FileIndex),
stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
wsize);
- Dmsg2(640, ">filed: send header stream=0x%lx len=%ld\n", rec->Stream, wsize);
+ if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+ return false;
+ }
+
+ Dmsg2(DT_DEDUP|640, ">filed: send header stream=0x%lx len=%ld\n", rec->Stream, wsize);
/* Send record header to File daemon */
if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
rec->FileIndex, rec->Stream, wsize)) {
fd->msglen = wsize;
/* Send data record to File daemon */
jcr->JobBytes += wsize; /* increment bytes this job */
- Dmsg1(640, ">filed: send %d bytes data.\n", fd->msglen);
+ Dmsg1(DT_DEDUP|640, ">filed: send %d bytes data.\n", fd->msglen);
+ if (jcr->dedup) {
+ ok = jcr->dedup->do_flowcontrol_rehydration(1);
+ }
if (!fd->send()) {
Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
Jmsg1(jcr, M_FATAL, 0, _("Error sending data to Client. ERR=%s\n"),
return true;
}
+ if (rec->Stream & STREAM_BIT_DEDUPLICATION_DATA) {
+ if (jcr->dedup==NULL) { // aka dcr->dev->dev_type!=B_DEDUP_DEV
+ Jmsg0(jcr, M_FATAL, 0, _("Cannot do rehydration, device is not dedup aware\n"));
+ return false;
+ }
+ Dmsg2(DT_DEDUP|640, "stream 0x%x is_rehydration_srvside=%d\n", rec->Stream, jcr->dedup->is_rehydration_srvside());
+ if (jcr->dedup->is_rehydration_srvside()) {
+ wbuf = jcr->dedup->get_msgbuf();
+ bool despite_of_error = false; /* the destination SD will check the data, don't try to cheat */
+ int size;
+ int err = jcr->dedup->record_rehydration(dcr, rec, wbuf, jcr->errmsg, despite_of_error, &size);
+ if (err < 0) {
+ /* cannot read data from DSE */
+ Jmsg1(jcr, M_FATAL, 0, "%s", jcr->errmsg);
+ return false;
+ }
+ wsize = size;
+ } else {
+ // if the other SD does dedup, then do flow control
+ if (!jcr->dedup->is_thread_started()) {
+ Dmsg0(DT_DEDUP|215, "Starting rehydration thread\n");
+ jcr->dedup->start_rehydration();
+ }
+ jcr->dedup->add_circular_buf(dcr, rec);
+ }
+ }
+
/*
* For normal migration jobs, FileIndex values are sequential because
* we are dealing with one job. However, for Vbackup (consolidation),
/* Something changed */
if (rec->last_VolSessionId != 0) { /* Not first record */
Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
+ if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+ return false;
+ }
if (!fd->signal(BNET_EOD)) { /* End of previous stream */
Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
fd->bstrerror());
wsize);
/* Send data header to File daemon */
+ if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+ return false;
+ }
if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, wsize)) {
Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
fd->msglen = wsize;
jcr->JobBytes += wsize; /* increment bytes this job */
Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
+ if (jcr->dedup) {
+ ok = jcr->dedup->do_flowcontrol_rehydration(1);
+ }
if (!fd->send()) {
Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),