From: Michal Rakowski Date: Sun, 27 Dec 2020 11:47:15 +0000 (+0100) Subject: Introduce metadata plugin Iface X-Git-Tag: Release-11.3.2~702 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f84107f161ccee88b8f98b6eda44e4edbd9ce932;p=thirdparty%2Fbacula.git Introduce metadata plugin Iface TODO define and handle catalog plugin stream TODO add handling for packets > 64K --- diff --git a/bacula/src/dird/catreq.c b/bacula/src/dird/catreq.c index 78f41f13a..ff64d5bc0 100644 --- a/bacula/src/dird/catreq.c +++ b/bacula/src/dird/catreq.c @@ -652,6 +652,12 @@ static void update_attribute(JCR *jcr, char *msg, int32_t msglen) Jmsg1(jcr, M_FATAL, 0, _("Plugin object create error. %s"), db_strerror(jcr->db)); } + } else if (Stream == STREAM_PLUGIN_META_CATALOG) { + //TODO add storing values inside proper catalog table here instead of dummy print + meta_pkt mp(p); + Dmsg1(100, "[metadata plugin] type: %d\n", mp.type); + Dmsg1(100, "[metadata plugin] buffer len: %d\n", mp.buf_len); + Dmsg2(100, "[metadata plugin] buf: %.*s\n", mp.buf_len, mp.buf); } else if (Stream == STREAM_RESTORE_OBJECT) { ROBJECT_DBR ro; diff --git a/bacula/src/filed/backup.c b/bacula/src/filed/backup.c index 7fae57176..cf2f18d40 100644 --- a/bacula/src/filed/backup.c +++ b/bacula/src/filed/backup.c @@ -42,6 +42,7 @@ const bool have_libz = false; /* Forward referenced functions */ int save_file(JCR *jcr, FF_PKT *ff_pkt, bool top_level); static int send_data(bctx_t &bctx, int stream); +static bool encode_and_send_metadata(bctx_t &bctx); static void close_vss_backup_session(JCR *jcr); #ifdef HAVE_DARWIN_OS static bool send_resource_fork(bctx_t &bctx); @@ -230,6 +231,66 @@ bool blast_data_to_storage_daemon(JCR *jcr, char *addr) return ok; } +bool metadata_save(JCR *jcr, plugin_metadata *plug_meta) +{ + bool stat = false; + BSOCK *sd = jcr->store_bsock; + uint32_t mp_count = plug_meta->count(); + + /* Send each metadata packet separately to the sd */ + for (uint32_t i=0; iget(i); + + /*TODO add handling for meta size >64KB*/ + if (mp->size() > 65536) { + Jmsg1(jcr, M_ERROR, 0, _("Metadata size (%ld) is bigger then currently supported one (64KB)\n"), + mp->size()); + goto bail_out; + } + + /* Send stream header */ + switch (mp->type) { + case plugin_meta_blob: + stat = sd->fsend("%ld %d 0", jcr->JobFiles, STREAM_PLUGIN_META_BLOB); + if (!stat) { + if (!jcr->is_canceled() && !jcr->is_incomplete()) { + Jmsg1(jcr, M_FATAL, 0, _("Network send error to SD. ERR=%s\n"), + sd->bstrerror()); + } + goto bail_out; + } + break; + case plugin_meta_catalog_email: + stat = sd->fsend("%ld %d 0", jcr->JobFiles, STREAM_PLUGIN_META_CATALOG); + if (!stat) { + if (!jcr->is_canceled() && !jcr->is_incomplete()) { + Jmsg1(jcr, M_FATAL, 0, _("Network send error to SD. ERR=%s\n"), + sd->bstrerror()); + } + goto bail_out; + } + break; + default: + Jmsg1(jcr, M_FATAL, 0, _("Invalid metadata type: %d\n"), mp->type); + goto bail_out; + } + + /* Allocate space to store entire metadata packet as a single buffer. We do not need do differentiate metadata + * types since each of them uses mp->data as a main metadata payload. */ + sd->msg = check_pool_memory_size(sd->msg, mp->size()); + sd->msglen = mp->size(); + + /* Serialize packet */ + mp->serialize(sd->msg); + + /* Send metadata packet */ + stat = sd->send(); + sd->signal(BNET_EOD); /* indicate end of attributes data */ + } + +bail_out: + return stat; +} /** * Called here by find() for each file included. @@ -442,6 +503,12 @@ int save_file(JCR *jcr, FF_PKT *ff_pkt, bool top_level) if (!encode_and_send_attributes(bctx)) { goto bail_out; } + + /* Send Plugin Metadata attributes. Available only for Plugins */ + if (!encode_and_send_metadata(bctx)) { + goto bail_out; + } + /** Meta data only for restore object */ if (IS_FT_OBJECT(ff_pkt->type)) { goto good_rtn; @@ -925,6 +992,15 @@ err: return ret; } +static bool encode_and_send_metadata(bctx_t &bctx) +{ + if (!bctx.ff_pkt->cmd_plugin) { + return true; + } + + return plugin_backup_metadata(bctx.jcr, bctx.ff_pkt); +} + bool encode_and_send_attributes(bctx_t &bctx) { BSOCK *sd = bctx.jcr->store_bsock; diff --git a/bacula/src/filed/fd_plugins.c b/bacula/src/filed/fd_plugins.c index 9e00d2325..b4004de06 100644 --- a/bacula/src/filed/fd_plugins.c +++ b/bacula/src/filed/fd_plugins.c @@ -38,8 +38,9 @@ const char *plugin_type = "-fd.dll"; const char *plugin_type = "-fd.so"; #endif -extern int save_file(JCR *jcr, FF_PKT *ff_pkt, bool top_level); +extern bool save_file(JCR *jcr, FF_PKT *ff_pkt, bool top_level); extern bool check_changes(JCR *jcr, FF_PKT *ff_pkt); +extern int metadata_save(JCR *jcr, plugin_metadata *plug_meta); /* Function pointers to be set here */ extern DLL_IMP_EXP int (*plugin_bopen)(BFILE *bfd, const char *fname, uint64_t flags, mode_t mode); @@ -701,6 +702,8 @@ int plugin_save(JCR *jcr, FF_PKT *ff_pkt, bool top_level) ff_pkt->restore_obj.object = sp.restore_obj.object; ff_pkt->restore_obj.object_len = sp.restore_obj.object_len; } + } else if (sp.type == FT_PLUGIN_METADATA) { + ff_pkt->plug_meta = sp.plug_meta; } else { Dsm_check(999); if (!sp.fname) { @@ -1212,6 +1215,22 @@ bool plugin_set_attributes(JCR *jcr, ATTR *attr, BFILE *ofd) return true; } +bool plugin_backup_metadata(JCR *jcr, FF_PKT *ff_pkt) +{ + Plugin *plugin = jcr->plugin; + + /* Backup metadata if provided */ + if (ff_pkt->plug_meta) { + if (!metadata_save(jcr, ff_pkt->plug_meta)) { + Jmsg2(jcr, M_ERROR, 0, _("Failed to backup metadata for plugin: \"%s\" fname: %s"), + plugin->file, ff_pkt->fname); + return false; + } + } + + return true; +} + /* * The Plugin ACL data backup. We are using a new Plugin callback: * handleXACLdata() for that. The new callback get a pointer to diff --git a/bacula/src/filed/fd_plugins.h b/bacula/src/filed/fd_plugins.h index 3dc8dbcd6..e97e2f9d8 100644 --- a/bacula/src/filed/fd_plugins.h +++ b/bacula/src/filed/fd_plugins.h @@ -114,6 +114,167 @@ struct plugin_object { uint64_t object_size; }; +enum metadata_type { + plugin_meta_blob = 0, + plugin_meta_catalog_email, + plugin_meta_invalid = -1, +}; + +/* + * This packet is used for storing plugin's metadata. +*/ +class meta_pkt: public SMARTALLOC { + private: + bool decoded; /* Was metadata packed decoded from serialized stream or not */ + + public: + uint32_t total_size; /* Total size of metadata stream (consiting of packets) */ + uint16_t total_count; /* Total count of metadata packets in the stream */ + enum metadata_type type; /* Type of metadata (binary, email, ...) */ + uint16_t index; /* Index of the packet in metadata stream (starting from 0, goes up to [total_count-1] */ + uint32_t buf_len; /* Length of buffer */ + void *buf; /* Can be either passed by the user or allocated for deserialization */ + + meta_pkt(metadata_type type=plugin_meta_invalid, uint32_t len=0, void *buf=NULL, uint16_t idx=0) { + this->buf_len = len; + this->type = type; + this->buf = buf; + this->index = idx; + this->total_size = 0; + this->total_count = 0; + + decoded = false; + }; + + /* Build metadata packet from serialized stream */ + meta_pkt(void *stream) { + if (stream) { + unser_declare; + unser_begin(stream, 0); + unser_uint32(total_size); + unser_uint16(total_count); + unser_uint32((uint32_t&) type); + unser_uint16(index); + unser_uint32(buf_len); + buf = bmalloc(buf_len); + unser_bytes(buf, buf_len); + } else { + total_size = 0; + total_count = 0; + buf_len = 0; + type = plugin_meta_invalid; + index = 0; + buf = NULL; + } + + /* Mark if we need to free buf later or not */ + decoded = stream ? true : false; + }; + + ~meta_pkt() { + if (decoded) { + bfree(buf); + } + }; + + /* Size of single metadata packet (struct's size + plugin's metadata buffer) */ + uint32_t size() { + return sizeof(meta_pkt) + buf_len - sizeof(buf); + } + + /* Serialize metadata packed into specified buffer */ + void serialize(void *ser_buf) { + ser_declare; + + ser_begin(ser_buf, size()); + ser_uint32(total_size); + ser_uint16(total_count); + ser_uint32(type); + ser_uint16(index); + ser_uint32(buf_len); + ser_bytes(buf, buf_len); + } + +}; + +/* + * This class is used for transferring plugin's file metadata between the plugin and the fd. + * It's a helper class to make metadata packets easy to manage for the plugins. + * + * Example usage (in form of pseudocode) of adding two packets: + * + * meta_mgr = New(plugin_metadata); + * meta_mgr->add_packet(plugin_meta_blog, buf1, buf1_len); + * meta_mgr->add_packet(plugin_meta_catalog_email, buf2, buf2_len); + * + * Then just simply return meta_mgr as an 'plug_meta' field in save_packet structure and + * set save_packet`s type to FT_PLUGIN_METADATA. Bacula will then take care of all of the packets + * added to the list and store it onto the volume one by one. + */ +class plugin_metadata: public SMARTALLOC { + private: + uint32_t total_size; /* Total size of metadata stream (consiting of many packets) */ + uint16_t total_count; /* Total count of metadata packets in the stream */ + alist *packets; /* List of packets in the stream */ + + public: + plugin_metadata() { + packets = New(alist(5, false)); + total_size = 0; + total_count = 0; + }; + + ~plugin_metadata() { + /* Remove packets from list, delete each of them */ + while (!packets->empty()) { + meta_pkt *mp = (meta_pkt *)packets->pop(); + delete mp; + } + + delete packets; + }; + + /* Create packet with specified attributes, add it to the list */ + void add_packet(metadata_type type, uint32_t len, void *buf) { + meta_pkt *mp = New(meta_pkt(type, len, buf, total_count++)); + total_size+=mp->size(); + mp->total_size = total_size; + mp->total_count = total_count; + + packets->push(mp); + + /* Update all packets with new total size and count */ + foreach_alist(mp, packets) { + mp->total_size = total_size; + mp->total_count = total_count; + } + + }; + + uint32_t size() { + return total_size; + }; + + uint16_t count() { + return total_count; + }; + + meta_pkt *get(int index) { + return (meta_pkt *)packets->get(index); + }; + + void reset() { + //Free allocated metadata packets + while (!packets->empty()) { + meta_pkt *mp = (meta_pkt *)packets->pop(); // remove from list + delete mp; + } + + total_size = 0; + total_count = 0; + } +}; + /* * This packet is used for file save info transfer. */ @@ -133,6 +294,7 @@ struct save_pkt { char *cmd; /* command */ struct restore_object restore_obj; /* Info about restore object */ struct plugin_object plugin_obj; /* Plugin Object */ + plugin_metadata *plug_meta; /* Metadata packet provided by plugin */ uint32_t delta_seq; /* Delta sequence number */ int32_t LinkFI; /* LinkFI if LINKSAVED */ int32_t pkt_end; /* end packet sentinel */ @@ -332,6 +494,7 @@ int plugin_backup_xattr(JCR *jcr, FF_PKT *ff_pkt, char **data); bool plugin_restore_xattr(JCR *jcr, char *data, uint32_t length); bool plugin_check_stream(JCR *jcr, int32_t &stream); bool plugin_query_parameter(JCR *jcr, char *command, char *param, void sendit(JCR *jcr, const char *str)); +bool plugin_backup_metadata(JCR *jcr, FF_PKT *ff_pkt); #endif #ifdef __cplusplus @@ -430,6 +593,7 @@ typedef struct s_pluginFuncs { bRC (*restoreFileList)(bpContext *ctx, struct restore_filelist_pkt *rp); bRC (*checkStream)(bpContext *ctx, struct stream_pkt *sp); bRC (*queryParameter)(bpContext *ctx, struct query_pkt *qp); + bRC (*metadataRestore)(bpContext *ctx, struct meta_pkt *mp); } pFuncs; #define plug_func(plugin) ((pFuncs *)(plugin->pfuncs)) diff --git a/bacula/src/filed/restore.c b/bacula/src/filed/restore.c index 186f36c57..1df14b66a 100644 --- a/bacula/src/filed/restore.c +++ b/bacula/src/filed/restore.c @@ -1028,6 +1028,31 @@ void do_restore(JCR *jcr) case STREAM_RESTORE_OBJECT: break; /* these are sent by Director */ + case STREAM_PLUGIN_META_BLOB: + case STREAM_PLUGIN_META_CATALOG: + { + if (!jcr->plugin) { + Jmsg(jcr, M_ERROR, 0, _("No plugin related to metadata packet found, metadata restore failed!\n")); + goto get_out; + } + + /* Deserialize and translate data obtained from volume into plugin's metadata packet */ + meta_pkt mp(bmsg->rbuf); + Dmsg1(400, "[metadata plugin packet] total_size: %d\n", mp.total_size); + Dmsg1(400, "[metadata plugin packet] total_count: %d\n", mp.total_count); + Dmsg1(400, "[metadata plugin packet] type: %d\n", mp.type); + Dmsg1(400, "[metadata plugin packet] index: %d\n", mp.index); + Dmsg2(400, "[metadata plugin packet] buf: %.*s\n", mp.buf_len, mp.buf); + + int rc = plug_func(jcr->plugin)->metadataRestore(jcr->plugin_ctx, &mp); + if (rc != bRC_OK) { + Jmsg(jcr, M_ERROR, 0, _("Plugin metadataRestore call failed, err: %d\n"), rc); + goto get_out; + } + + break; + } + default: if (!close_previous_stream(rctx)) { goto get_out; diff --git a/bacula/src/filetypes.h b/bacula/src/filetypes.h index 80467014d..239712e1b 100644 --- a/bacula/src/filetypes.h +++ b/bacula/src/filetypes.h @@ -71,6 +71,7 @@ #define FT_PLUGIN_CONFIG 27 /* Object for Plugin configuration */ #define FT_PLUGIN_CONFIG_FILLED 28 /* Object for Plugin configuration filled by Director */ #define FT_PLUGIN_OBJECT 29 /* Opaque Plugin Object used for Object Management*/ +#define FT_PLUGIN_METADATA 30 /* Plugin metadata */ /* Definitions for upper part of type word (see above). */ #define AR_DATA_STREAM (1<<16) /* Data stream id present */ diff --git a/bacula/src/findlib/find.h b/bacula/src/findlib/find.h index 037965e7f..7a771cf42 100644 --- a/bacula/src/findlib/find.h +++ b/bacula/src/findlib/find.h @@ -175,6 +175,7 @@ struct FF_PKT { int32_t delta_seq; /* Delta Sequence number */ struct restore_object restore_obj; struct plugin_object plugin_obj; + plugin_metadata *plug_meta; struct f_link *linked; /* Set if this file is hard linked */ int type; /* FT_ type from above */ int ff_errno; /* errno */ diff --git a/bacula/src/plugins/fd/test-plugin-fd.c b/bacula/src/plugins/fd/test-plugin-fd.c index 34a69d718..01e9693bf 100644 --- a/bacula/src/plugins/fd/test-plugin-fd.c +++ b/bacula/src/plugins/fd/test-plugin-fd.c @@ -54,6 +54,7 @@ static bRC getPluginValue(bpContext *ctx, pVariable var, void *value); static bRC setPluginValue(bpContext *ctx, pVariable var, void *value); static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value); static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp); +static bRC metadataRestore(bpContext *ctx, meta_pkt *mp); static bRC endBackupFile(bpContext *ctx); static bRC pluginIO(bpContext *ctx, struct io_pkt *io); static bRC startRestoreFile(bpContext *ctx, const char *cmd); @@ -103,7 +104,8 @@ static pFuncs pluginFuncs = { handleXACLdata, restoreFileList, NULL, /* No checkStream */ - queryParameter + queryParameter, + metadataRestore }; static struct ini_items test_items[] = { @@ -133,6 +135,7 @@ struct plugin_ctx { int nb_obj; /* Number of objects created */ int nb; /* used in queryParameter */ char *query_buf; /* buffer used to not loose memory */ + plugin_metadata *meta_mgr; int job_level; /* current Job level */ POOLMEM *buf; /* store ConfigFile */ }; @@ -180,6 +183,10 @@ static bRC newPlugin(bpContext *ctx) } memset(p_ctx, 0, sizeof(struct plugin_ctx)); ctx->pContext = (void *)p_ctx; /* set our context pointer */ + + // Create metadata menager class + p_ctx->meta_mgr = New(plugin_metadata); + return bRC_OK; } @@ -201,6 +208,9 @@ static bRC freePlugin(bpContext *ctx) if (p_ctx->cmd) { free(p_ctx->cmd); /* free any allocated command string */ } + if (p_ctx->meta_mgr) { + delete p_ctx->meta_mgr; + } free(p_ctx); /* free our private context */ ctx->pContext = NULL; return bRC_OK; @@ -360,7 +370,28 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value) return bRC_OK; } -/* +static bRC metadataRestore(bpContext *ctx, struct meta_pkt *mp) +{ + //TODO add proper handling instead of printing (i. e. add data buffer comparison) + Dmsg1(0, "Restoring metadata of index: %d\n", mp->index); + /*switch (mp->type) {*/ + /*case plugin_meta_blob:*/ + /*Dmsg0(0, "Restoring metadata of 'blob' type!\n");*/ + /*Dmsg1(0, _("---- [pluginrestore] len: %lld\n"), mp->len);*/ + /*Dmsg1(0, _("---- [pluginrestore] buf: %s\n"), mp->buf);*/ + /*break;*/ + /*case plugin_meta_catalog_email:*/ + /*Dmsg0(0, "Restoring metadata of 'email catalog' type!\n");*/ + /*break;*/ + /*default:*/ + /*Dmsg1(0, "Invalid metadata type: %d!\n", mp->type);*/ + /*break;*/ + /*}*/ + + return bRC_OK; +} + +/* * Start the backup of a specific file */ static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp) @@ -659,6 +690,30 @@ static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp) return bRC_OK; } else if (p_ctx->nb_obj == 7) { + // Remove any metadata packets related to previous files + p_ctx->meta_mgr->reset(); + + const char* m1 = + "{\ + \"key1\": \"val1\", \ + \"key2\": \"val2\", \ + \"key3\": \"val3\" \ + }"; + + /*TODO change payload to catalog packet when it's defined*/ + const char *m2 = "meta_type=email,title=msg"; + + p_ctx->meta_mgr->add_packet(plugin_meta_blob, strlen(m1), (void *)m1); + p_ctx->meta_mgr->add_packet(plugin_meta_catalog_email, strlen(m2), (void *)m2); + + sp->plug_meta = p_ctx->meta_mgr; + + p_ctx->nb_obj++; + sp->type = FT_PLUGIN_METADATA; + + return bRC_OK; + + } else if (p_ctx->nb_obj == 8) { p_ctx->nb_obj++; if (p_ctx->job_level == 'F') { sp->type = FT_REG; @@ -707,9 +762,8 @@ static bRC endBackupFile(bpContext *ctx) * We would return bRC_More if we wanted startBackupFile to be * called again to backup another file */ - if (p_ctx->nb_obj >= 8) { + if (p_ctx->nb_obj >= 9) { return bRC_OK; - } else { return bRC_More; } diff --git a/bacula/src/stored/append.c b/bacula/src/stored/append.c index 722865027..60adfe924 100644 --- a/bacula/src/stored/append.c +++ b/bacula/src/stored/append.c @@ -444,6 +444,7 @@ bool send_attrs_to_dir(JCR *jcr, DEV_RECORD *rec) rec->maskedStream == STREAM_UNIX_ATTRIBUTES_EX || rec->maskedStream == STREAM_RESTORE_OBJECT || rec->maskedStream == STREAM_PLUGIN_OBJECT || + rec->maskedStream == STREAM_PLUGIN_META_CATALOG || crypto_digest_stream_type(rec->maskedStream) != CRYPTO_DIGEST_NONE) { if (!jcr->no_attributes) { BSOCK *dir = jcr->dir_bsock; diff --git a/bacula/src/stored/bls.c b/bacula/src/stored/bls.c index 41b9f6373..ba95c62b0 100644 --- a/bacula/src/stored/bls.c +++ b/bacula/src/stored/bls.c @@ -451,6 +451,10 @@ static bool record_cb(DCR *dcr, DEV_RECORD *rec) Dmsg0(100, "Restore Object record\n"); } else if (rec->maskedStream == STREAM_PLUGIN_OBJECT) { Dmsg0(100, "Plugin Object record\n"); + } else if (rec->maskedStream == STREAM_PLUGIN_META_BLOB) { + Dmsg0(100, "Plugin Object binary metadata record\n"); + } else if (rec->maskedStream == STREAM_PLUGIN_META_CATALOG) { + Dmsg0(100, "Plugin Object catalog metadata record\n"); } else if (rec->maskedStream == STREAM_ADATA_BLOCK_HEADER) { Dmsg0(000, "Adata block header\n"); } else if (rec->maskedStream == STREAM_ADATA_RECORD_HEADER) { diff --git a/bacula/src/stored/bscan.c b/bacula/src/stored/bscan.c index 7c5cf05ba..e645dcb98 100644 --- a/bacula/src/stored/bscan.c +++ b/bacula/src/stored/bscan.c @@ -810,6 +810,13 @@ static bool record_cb(DCR *dcr, DEV_RECORD *rec) break; } + case STREAM_PLUGIN_META_BLOB: + case STREAM_PLUGIN_META_CATALOG: + { + meta_pkt mp(rec->data); + //TODO We probably don't want to log every single metadata record + Pmsg4(0, _("Plugin metadata of type: %d len: %ld buf: %.*s\n"), mp.type, mp.buf_len, mp.buf_len, mp.buf); + } case STREAM_PLUGIN_OBJECT: { OBJECT_DBR obj_r; diff --git a/bacula/src/streams.h b/bacula/src/streams.h index 8bf8b15c9..ade7fcad6 100644 --- a/bacula/src/streams.h +++ b/bacula/src/streams.h @@ -102,6 +102,9 @@ #define STREAM_ENCRYPTED_FILE_COMPRESSED_DATA 32 /* Encrypted, compressed data */ #define STREAM_ENCRYPTED_WIN32_COMPRESSED_DATA 33 /* Encrypted, compressed Win32 BackupRead data */ #define STREAM_PLUGIN_OBJECT 34 /* Plugin object */ +#define STREAM_PLUGIN_META_HEADER 35 /* Plugin metadata header for file being backed up */ +#define STREAM_PLUGIN_META_BLOB 36 /* Plugin metadata (blob) for file being backed up */ +#define STREAM_PLUGIN_META_CATALOG 37 /* Plugin metadata (to be stored in catalog) for file being backed up */ #define STREAM_ADATA_BLOCK_HEADER 200 /* Adata block header */ #define STREAM_ADATA_RECORD_HEADER 201 /* Adata record header */