]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
Add SDPacketCheck in the FileDaemon resource to control the network flow
authorEric Bollengier <eric@baculasystems.com>
Fri, 5 Mar 2021 14:55:38 +0000 (15:55 +0100)
committerEric Bollengier <eric@baculasystems.com>
Thu, 24 Mar 2022 08:03:01 +0000 (09:03 +0100)
bacula/src/filed/backup.c
bacula/src/filed/filed.h
bacula/src/filed/filed_conf.c
bacula/src/filed/filed_conf.h
bacula/src/filed/heartbeat.c
bacula/src/filed/job.c
bacula/src/jcr.h

index 1c4973446437cdb7a1f31248badef9a88fcea421..e0fb6c7e1fdb93de29d953ee79a205bdf6b1d044 100644 (file)
@@ -67,7 +67,11 @@ bool blast_data_to_storage_daemon(JCR *jcr, char *addr)
 {
    BSOCK *sd;
    bool ok = true;
-   // TODO landonf: Allow user to specify encryption algorithm
+
+   /* Initialize the poll manager if we have the configuration set */
+   if (me->sd_packet_check) {
+      jcr->sd_packet_mgr = New(bnet_poll_manager(me->sd_packet_check));
+   }
 
    sd = jcr->store_bsock;
 
@@ -226,6 +230,7 @@ bool blast_data_to_storage_daemon(JCR *jcr, char *addr)
 
    crypto_session_end(jcr);
 
+   bdelete_and_null(jcr->sd_packet_mgr);
 
    Dmsg1(100, "end blast_data ok=%d\n", ok);
    return ok;
@@ -781,6 +786,9 @@ static int send_data(bctx_t &bctx, int stream)
       if (!process_and_send_data(bctx)) {
          goto err;
       }
+      if (jcr->sd_packet_mgr) {
+         jcr->sd_packet_mgr->send(jcr, sd); // Send a POLL request if needed
+      }
    } /* end while read file data */
    goto finish_sending;
 
index a40bfeb601739f8f357afea9ace3a29df6599884..6f4305db9d6f151baf7371d85e89b8ce31a6223f 100644 (file)
@@ -86,6 +86,27 @@ typedef struct {
    int bacula_client_test_metric2;
 } fdstatmetrics_t;
 
+class bnet_poll_manager: public SMARTALLOC
+{
+private:
+   pthread_cond_t  m_cond;
+   pthread_mutex_t m_mutex;
+   int32_t         m_check_done; /* small state machine to sync the two threads */
+
+   int32_t         m_count;     /* Current value, at 0 we send the POLL */
+   int32_t         m_check;     /* configuration file value */
+
+public:
+   bnet_poll_manager(int32_t val);
+   ~bnet_poll_manager();
+
+   void init(int32_t val);
+   void destroy();
+   void send(JCR *jcr, BSOCK *sd);
+   void recv(JCR *jcr, const char *msg);
+};
+
+
 void allow_os_suspensions();
 void prevent_os_suspensions();
 bool update_permanent_stats(void *data);
index 1720d4e7430e0b76ab74ad266f57bfe825b3f4b6..bfd79aebb1e0243fed4dcde7edeca8fc2a3e79c0 100644 (file)
@@ -118,6 +118,7 @@ static RES_ITEM cli_items[] = {
    {"CommCompression",       store_bool,    ITEM(res_client.comm_compression), 0, ITEM_DEFAULT, true},
    {"DisableCommand",        store_alist_str, ITEM(res_client.disable_cmds), 0, 0, 0},
    {"MaximumJobErrorCount",  store_pint32,    ITEM(res_client.max_job_errors),  0, ITEM_DEFAULT, 1000},
+   {"SDPacketCheck",         store_pint32,    ITEM(res_client.sd_packet_check),  0, ITEM_DEFAULT, 0},
 #if BEEF
    {"DedupIndexDirectory",   store_dir,    ITEM(res_client.dedup_index_dir), 0, 0, 0}, /* deprecated */
    {"EnableClientRehydration", store_bool,    ITEM(res_client.allow_dedup_cache), 0, ITEM_DEFAULT, false},
index 0d3e380e8697c547934dd69c96339217e5bec7a0..cc32c7fd8665aeb485fbddc47b2274df83380265 100644 (file)
@@ -136,6 +136,7 @@ struct CLIENT {
    utime_t heartbeat_interval;        /* Interval to send heartbeats */
    uint32_t max_network_buffer_size;  /* max network buf size */
    uint32_t max_job_errors;           /* Maximum number of errors tolerated by the client to fail the job */
+   int32_t sd_packet_check;           /* Send a POLL request every X data packets */
    bool comm_compression;             /* Enable comm line compression */
    bool pki_sign;                     /* Enable Data Integrity Verification via Digital Signatures */
    bool pki_encrypt;                  /* Enable Data Encryption */
index e8a355fbcd4602fc954cfa7c1c03fc8d691fe615..f57fa705dbd881b49839e9a26762078a389b20bc 100644 (file)
@@ -107,6 +107,7 @@ extern "C" void *sd_heartbeat_thread(void *arg)
             Dmsg2(100, "Got m=%d BNET_SIG %d from SD\n", m, sd->msglen);
          } else {
             Dmsg3(100, "Got m=%d msglen=%d bytes from SD. MSG=%s\n", m, sd->msglen, sd->msg);
+            jcr->sd_packet_mgr->recv(jcr, sd->msg); // Might be to ack a POLL request
          }
       }
       Dmsg2(200, "wait_intr=%d stop=%d\n", n, sd->is_stop());
index 337efdc13a7f954cb9fc273200e1ed1df0c9e66e..51c4d175407a60eb42d7b915eb4596199d1ee6bf 100644 (file)
@@ -3493,3 +3493,82 @@ int response(JCR *jcr, BSOCK *sd, char *resp, const char *cmd)
    }
    return 0;
 }
+
+/* 
+ * Small helper to manager a POLL request when the heartbeat is started
+ * When we send POLL, we get a OK message, but if the heartbeat is started
+ * the message is discarded. The following class is doing the synchronization
+ * between the two threads after a POLL message.
+ */
+bnet_poll_manager::bnet_poll_manager(int32_t val)
+{
+   pthread_cond_init(&m_cond, NULL);
+   pthread_mutex_init(&m_mutex, NULL);
+   init(val);
+}
+
+/* Call one time per job in blast_data_to_storage_daemon() */
+void bnet_poll_manager::init(int32_t val)
+{
+   m_check = val;               /* Value of the config file */
+   m_count = val;               /* Current value */
+   m_check_done = 0;            /* small state machine to sync the two threads */
+}
+
+bnet_poll_manager::~bnet_poll_manager()
+{
+   destroy();
+}
+
+void bnet_poll_manager::destroy()
+{
+   pthread_cond_destroy(&m_cond);
+   pthread_mutex_destroy(&m_mutex);
+}
+
+/* Send a POLL and get the answer every X packets, called in save_data() */
+void bnet_poll_manager::send(JCR *jcr, BSOCK *sd)
+{
+   int32_t val = m_count;
+
+   if (val == 0) {
+      Dmsg1(DT_NETWORK|10, "Request a POLL after %d packets...\n", m_check);
+      m_check_done = 1; /* We sent the request */
+      sd->signal(BNET_POLL);
+
+      struct timespec t;
+
+      P(m_mutex);
+      do {
+         t.tv_sec = time(NULL) + 5;
+         t.tv_nsec = 0;
+         pthread_cond_timedwait(&m_cond, &m_mutex, &t);
+      } while (m_check_done == 1 && !jcr->is_canceled());
+      V(m_mutex);
+
+      if (m_check_done == 2) {
+         Dmsg0(DT_NETWORK|10, "Got it\n");
+      }
+      m_check_done = 0;
+   }
+
+   val--;
+
+   if (val < 0) {               /* Initialization or loop found */
+      val = m_check;
+   }
+
+   m_count = val;
+}
+
+/* Called from the heartbeat thread */
+void bnet_poll_manager::recv(JCR *jcr, const char *msg)
+{
+   if (m_check_done == 1 && strncmp(msg, "2000 OK\n", 8) == 0) {
+      Dmsg0(DT_NETWORK|10, "Wake up the other thread after POLL\n");
+      P(m_mutex);
+      m_check_done = 2;
+      pthread_cond_signal(&m_cond);
+      V(m_mutex);
+   }
+}
index af457ace95156cc76b087fd3133216a866a9c8c7..09974678906940fa0db27c2034a9388ed735865d 100644 (file)
@@ -160,6 +160,7 @@ class htable;
 class BACL;
 class BXATTR;
 class snapshot_manager;
+class bnet_poll_manager;
 
 struct CRYPTO_CTX {
    bool pki_sign;                     /* Enable PKI Signatures? */
@@ -448,6 +449,9 @@ public:
    uint32_t EndFile;
    uint32_t StartBlock;
    uint32_t EndBlock;
+
+   bnet_poll_manager *sd_packet_mgr;  /* Manage POLL requests to control the flow */
+
    int32_t sd_dedup;                  /* set if SD has dedup */
    int32_t sd_hash;                   /* SD hash type */
    int32_t sd_hash_size;              /* SD hash size */