]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
BEE Backport bacula/src/stored/read.c
authorAlain Spineux <alain@baculasystems.com>
Tue, 12 May 2020 17:10:55 +0000 (19:10 +0200)
committerEric Bollengier <eric@baculasystems.com>
Thu, 29 Apr 2021 08:44:18 +0000 (10:44 +0200)
This commit is the result of the squash of the following main commits:

Author: Alain Spineux <alain@baculasystems.com>
Date:   Thu Sep 10 14:53:06 2015 +0200

    Fix MA1151 RT14681 bsock.c:713 Write error sending 24 bytes to Storage ...

    Fix a problem in the flow control termination in copy job.
    One side was still sending FC traffic while the other was not
    listening for them anymore.
    Use the BNET_CMD_STP_FLOWCTRL message that was first used for the FD-SD
    protocol.

bacula/src/stored/read.c

index 1143703b1584c94e73e4027eee4326d685e920ac..c60f5b34d33d4d67b51c8c6da859af99efa3f76b 100644 (file)
@@ -25,7 +25,7 @@
 
 #include "bacula.h"
 #include "stored.h"
-
+#include "dedupstored.h"
 /* Forward referenced subroutines */
 static bool read_record_cb(DCR *dcr, DEV_RECORD *rec);
 static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec);
@@ -68,9 +68,16 @@ bool do_read_data(JCR *jcr)
       return false;
    }
    dcr->dev->start_of_job(dcr);
+   dcr->dev->setup_dedup_rehydration_interface(dcr);
 
    /* Tell File daemon we will send data */
    if (!jcr->is_ok_data_sent) {
+      /* OK_DATA can have been already sent for copy/migrate by run_job() to avoid dead lock*/
+      Dmsg0(DT_DEDUP|215, "send OK_data\n");
+      if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+         jcr->dedup->warn_rehydration_eod();
+         return false;
+      }
       fd->fsend(OK_data);
       jcr->is_ok_data_sent = true;
    }
@@ -99,9 +106,25 @@ bool do_read_data(JCR *jcr)
          job_elapsed / 3600, job_elapsed % 3600 / 60, job_elapsed % 60,
          edit_uint64_with_suffix(jcr->JobBytes / job_elapsed, ec));
 
+   if (jcr->dedup) {
+      /* must be sure that FD queue is not full to receive the BNET_EOD below */
+      jcr->dedup->do_flowcontrol_rehydration(1);
+      /* tell the dedup rehydration thread that this is done and it can quit
+       * as soon it get the last ACK from the FD/SD
+       * This is only useful for SD-SD right now, because
+       * SD-FD use the BNET_CMD_STP_THREAD command
+       * This must be done before the BNET_EOD, because this is the
+       * matching BNET_CMD_REC_ACK that will allow to exit the loop
+       */
+      Dmsg0(DT_DEDUP|215, "warn about end of rehydration thread\n");
+      jcr->dedup->warn_rehydration_eod();
+   }
+
    /* Send end of data to FD */
    fd->signal(BNET_EOD);
 
+   dcr->dev->free_dedup_rehydration_interface(dcr);
+
    if (!release_device(jcr->read_dcr)) {
       ok = false;
    }
@@ -124,13 +147,48 @@ static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
       return true;
    }
 
+   /* Do rehydration */
+   if (rec->Stream & STREAM_BIT_DEDUPLICATION_DATA) {
+      if (jcr->dedup==NULL) {  // aka dcr->dev->dev_type!=B_DEDUP_DEV
+         Jmsg0(jcr, M_FATAL, 0, _("Cannot do rehydration, device is not dedup aware\n"));
+         return false;
+      }
+      Dmsg2(DT_DEDUP|640, "stream 0x%x is_rehydration_srvside=%d\n", rec->Stream, jcr->dedup->is_rehydration_srvside());
+      if (jcr->dedup->is_rehydration_srvside()) {
+         wbuf = jcr->dedup->get_msgbuf();
+         bool despite_of_error = forge_on;
+         int size;
+         int err = jcr->dedup->record_rehydration(dcr, rec, wbuf, jcr->errmsg, despite_of_error, &size);
+         if (err) {
+            /* cannot read data from DDE */
+            if (!despite_of_error) {
+               Jmsg1(jcr, M_FATAL, 0, "%s", jcr->errmsg);
+               return false;
+            }
+            Jmsg1(jcr, M_ERROR, 0, "%s", jcr->errmsg);
+         }
+         wsize = size;
+      } else {
+         // if the FD will do dedup, then do flow control
+         if (!jcr->dedup->is_thread_started()) {
+            Dmsg0(DT_DEDUP|215, "Starting rehydration thread\n");
+            jcr->dedup->start_rehydration();
+         }
+         jcr->dedup->add_circular_buf(dcr, rec);
+      }
+   }
+
    Dmsg5(400, "Send to FD: SessId=%u SessTim=%u FI=%s Strm=%s, len=%d\n",
       rec->VolSessionId, rec->VolSessionTime,
       FI_to_ascii(ec1, rec->FileIndex),
       stream_to_ascii(ec2, rec->Stream, rec->FileIndex),
       wsize);
 
-   Dmsg2(640, ">filed: send header stream=0x%lx len=%ld\n", rec->Stream, wsize);
+   if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+      return false;
+   }
+
+   Dmsg2(DT_DEDUP|640, ">filed: send header stream=0x%lx len=%ld\n", rec->Stream, wsize);
    /* Send record header to File daemon */
    if (!fd->fsend(rec_header, rec->VolSessionId, rec->VolSessionTime,
           rec->FileIndex, rec->Stream, wsize)) {
@@ -169,7 +227,10 @@ static bool read_record_cb(DCR *dcr, DEV_RECORD *rec)
    fd->msglen = wsize;
    /* Send data record to File daemon */
    jcr->JobBytes += wsize;   /* increment bytes this job */
-   Dmsg1(640, ">filed: send %d bytes data.\n", fd->msglen);
+   Dmsg1(DT_DEDUP|640, ">filed: send %d bytes data.\n", fd->msglen);
+   if (jcr->dedup) {
+      ok = jcr->dedup->do_flowcontrol_rehydration(1);
+   }
    if (!fd->send()) {
       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
       Jmsg1(jcr, M_FATAL, 0, _("Error sending data to Client. ERR=%s\n"),
@@ -211,6 +272,33 @@ static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
       return true;
    }
 
+   if (rec->Stream & STREAM_BIT_DEDUPLICATION_DATA) {
+      if (jcr->dedup==NULL) {  // aka dcr->dev->dev_type!=B_DEDUP_DEV
+         Jmsg0(jcr, M_FATAL, 0, _("Cannot do rehydration, device is not dedup aware\n"));
+         return false;
+      }
+      Dmsg2(DT_DEDUP|640, "stream 0x%x is_rehydration_srvside=%d\n", rec->Stream, jcr->dedup->is_rehydration_srvside());
+      if (jcr->dedup->is_rehydration_srvside()) {
+         wbuf = jcr->dedup->get_msgbuf();
+         bool despite_of_error = false; /* the destination SD will check the data, don't try to cheat */
+         int size;
+         int err = jcr->dedup->record_rehydration(dcr, rec, wbuf, jcr->errmsg, despite_of_error, &size);
+         if (err < 0) {
+            /* cannot read data from DSE */
+            Jmsg1(jcr, M_FATAL, 0, "%s", jcr->errmsg);
+            return false;
+         }
+         wsize = size;
+      } else {
+         // if the other SD does dedup, then do flow control
+         if (!jcr->dedup->is_thread_started()) {
+            Dmsg0(DT_DEDUP|215, "Starting rehydration thread\n");
+            jcr->dedup->start_rehydration();
+         }
+         jcr->dedup->add_circular_buf(dcr, rec);
+      }
+   }
+
    /*
     * For normal migration jobs, FileIndex values are sequential because
     *  we are dealing with one job.  However, for Vbackup (consolidation),
@@ -229,6 +317,9 @@ static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
          /* Something changed */
          if (rec->last_VolSessionId != 0) {        /* Not first record */
             Dmsg1(200, "Send EOD jobfiles=%d\n", jcr->JobFiles);
+            if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+               return false;
+            }
             if (!fd->signal(BNET_EOD)) {  /* End of previous stream */
                Jmsg(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
                         fd->bstrerror());
@@ -256,6 +347,9 @@ static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
          wsize);
 
       /* Send data header to File daemon */
+      if (jcr->dedup && !jcr->dedup->do_flowcontrol_rehydration(1)) {
+         return false;
+      }
       if (!fd->fsend("%ld %ld %ld", rec->FileIndex, rec->Stream, wsize)) {
          Pmsg1(000, _(">filed: Error Hdr=%s\n"), fd->msg);
          Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),
@@ -271,6 +365,9 @@ static bool mac_record_cb(DCR *dcr, DEV_RECORD *rec)
    fd->msglen = wsize;
    jcr->JobBytes += wsize;   /* increment bytes this job */
    Dmsg1(400, ">filed: send %d bytes data.\n", fd->msglen);
+   if (jcr->dedup) {
+      ok = jcr->dedup->do_flowcontrol_rehydration(1);
+   }
    if (!fd->send()) {
       Pmsg1(000, _("Error sending to FD. ERR=%s\n"), fd->bstrerror());
       Jmsg1(jcr, M_FATAL, 0, _("Error sending to File daemon. ERR=%s\n"),