]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
BEE Backport bacula/src/cats/bvfs.c
authorEric Bollengier <eric@baculasystems.com>
Mon, 11 May 2020 15:02:56 +0000 (17:02 +0200)
committerEric Bollengier <eric@baculasystems.com>
Thu, 29 Apr 2021 08:44:17 +0000 (10:44 +0200)
This commit is the result of the squash of the following main commits:

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Tue Feb 25 15:01:33 2020 +0100

    Change !BEEF to COMMUNITY

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Mon Dec 2 16:09:07 2019 +0100

    Fix compilation with COMMUNITY

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Thu Sep 19 11:24:32 2019 +0200

    Replace few strncpy() with bstrncpy()

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Wed Aug 14 14:23:03 2019 +0200

    Move acl/permission BEEF code to bee_XXX.c/h files

Author: Alain Spineux <alain@baculasystems.com>
Date:   Mon Jul 1 17:26:49 2019 +0200

    make .bvfs_clear_cache works with sqlite

    - The TRUNCATE TABLE command don't exist in sqlite, but the
      "delete from table" command is optimized to "truncate" the table when
      the "where" clause is not used.

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Tue May 28 10:38:58 2019 +0200

    bvfs: Use ClientACL+RestoreClientACL in bvfs queries

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Thu May 16 10:40:49 2019 +0200

    Fix #5053 about BVFS commands not compatible with ACLs wildcards

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Fri Apr 26 10:21:14 2019 +0200

    Fix #4817 about BVFS query issue with SQLite

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Wed Apr 10 09:59:24 2019 +0200

    bvfs: Adapt queries for deleted directories

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Wed Apr 3 14:54:26 2019 +0200

    bvfs: Use ESCAPE for BaseFiles query

Author: Marcin Haba <marcin.haba@bacula.pl>
Date:   Fri Aug 17 17:03:28 2018 +0200

    Fix #3972 escaping special characters in bvfs restore for sqlite catalog

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Wed Apr 11 17:48:13 2018 +0200

    Fix #3607 about SQLite optimization with BVFS restore

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Thu Mar 8 20:50:57 2018 +0100

    bvfs: Fix hardlinks detection with files having imcomplete st_mode attribute

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Mon Nov 20 16:53:02 2017 +0100

    bvfs: Fix hardlink selection issue with subdirectories

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Sat Nov 11 14:54:26 2017 +0100

    bvfs: Add function bvfs_delete_filed for plugins that requires to delete a catalog entry

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Thu Nov 9 15:01:55 2017 +0100

    bvfs: Add filename and path arguments to .bvfs_version

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Tue Nov 1 11:47:04 2016 +0100

    More work on DirectoryACL and UserIdACL code

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Tue Oct 18 18:07:24 2016 +0200

    Add DirectoryACL and UserIdACL directives to the Console resource

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Fri Oct 23 12:21:49 2015 +0200

    Fix #1343 about a problem with BVFS queries with deleted directories

    We now use JobTDate to find the last entry (instead of JobId) and we
    check FileIndex=0 that indicates if a file or a directory was deleted.

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Tue Aug 11 15:24:55 2015 +0200

    Add bvfs interface to access DeltaSeq file attribute

Author: Kern Sibbald <kern@sibbald.com>
Date:   Fri Apr 17 18:33:35 2015 +0200

    Refactor DB engine to be class based

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Mon Feb 9 10:10:52 2015 +0100

    Fix #875 about bvfs repeats the same output many times

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Thu Oct 17 18:20:28 2013 +0200

    Fix bvfs lsfiles query with new Filename structure

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Mon Aug 26 13:57:34 2013 +0200

    Merge Filename table to File

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Thu Aug 16 11:30:00 2012 +0200

    Fix #1923 about MySQL 4 support for BVFS

Author: Eric Bollengier <eric@baculasystems.com>
Date:   Sun Feb 12 19:18:53 2012 +0100

    Handle delta files in bvfs

bacula/src/cats/bvfs.c

index dcb0512444b34dd5de5c26fcd68901abbaa20a97..b1cd0597a7e06a141a92fd6e81eda02d67358711 100644 (file)
@@ -23,8 +23,6 @@
 #include "lib/htable.h"
 #include "bvfs.h"
 
-#define can_access(x) (true)
-
 /* from libbacfind */
 extern int decode_stat(char *buf, struct stat *statp, int stat_size, int32_t *LinkFI);
 
@@ -69,11 +67,13 @@ Bvfs::Bvfs(JCR *j, BDB *mdb)
    list_entries = result_handler;
    user_data = this;
    username = NULL;
-   job_acl = client_acl = pool_acl = fileset_acl = NULL;
+   job_acl = client_acl = restoreclient_acl = NULL;
+   pool_acl = fileset_acl = NULL;
    last_dir_acl = NULL;
+   uid_acl = NULL;
+   gid_acl = NULL;
    dir_acl = NULL;
    use_acl = false;
-   dir_filenameid = 0;       /* special FilenameId where Name='' */
 }
 
 Bvfs::~Bvfs() {
@@ -91,6 +91,15 @@ Bvfs::~Bvfs() {
    if (dir_acl) {
       delete dir_acl;
    }
+   if (uid_acl) {
+      delete uid_acl;
+   }
+   if (gid_acl) {
+      delete gid_acl;
+   }
+   if (client_acl) {
+      delete client_acl;
+   }
 }
 
 char *Bvfs::escape_list(alist *lst)
@@ -131,11 +140,13 @@ char *Bvfs::escape_list(alist *lst)
 int Bvfs::filter_jobid()
 {
    POOL_MEM query;
-   POOL_MEM sub_where;
    POOL_MEM sub_join;
+   POOLMEM *sub_where;
 
    /* No ACL, no username, no check */
-   if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
+   if (!job_acl && !fileset_acl &&
+       !client_acl && !restoreclient_acl && !pool_acl && !username)
+   {
       Dmsg0(dbglevel_sql, "No ACL\n");
       /* Just count the number of items in the list */
       int nb = (*jobids != 0) ? 1 : 0;
@@ -147,24 +158,27 @@ int Bvfs::filter_jobid()
       return nb;
    }
 
+   sub_where = get_pool_memory(PM_FNAME);
+   *sub_where = 0;
    if (job_acl) {
-      Mmsg(sub_where, " AND Job.Name IN (%s) ", escape_list(job_acl));
+      pm_strcat(sub_where, " AND ");
+      db->BDB::escape_acl_list(jcr, "Job.Name", &sub_where, job_acl);
    }
 
    if (fileset_acl) {
-      Mmsg(query, " AND FileSet.FileSet IN (%s) ", escape_list(fileset_acl));
-      pm_strcat(sub_where, query.c_str());
+      pm_strcat(sub_where, " AND ");
+      db->BDB::escape_acl_list(jcr, "FileSet.FileSet", &sub_where, fileset_acl);
       pm_strcat(sub_join, " JOIN FileSet USING (FileSetId) ");
    }
 
    if (client_acl) {
-      Mmsg(query, " AND Client.Name IN (%s) ", escape_list(client_acl));
-      pm_strcat(sub_where, query.c_str());
+      pm_strcat(sub_where, " AND ");
+      db->BDB::escape_acl_list(jcr, "Client.Name", &sub_where, client_acl);
    }
 
    if (pool_acl) {
-      Mmsg(query, " AND Pool.Name IN (%s) ", escape_list(pool_acl));
-      pm_strcat(sub_where, query.c_str());
+      pm_strcat(sub_where, " AND ");
+      db->BDB::escape_acl_list(jcr, "Pool.Name", &sub_where, pool_acl);
       pm_strcat(sub_join, " JOIN Pool USING (PoolId) ");
    }
 
@@ -181,19 +195,19 @@ int Bvfs::filter_jobid()
        "WHERE bweb_user.username = '%s' "
       ") AS filter USING (ClientId) "
         " WHERE JobId IN (%s) %s",
-           sub_join.c_str(), username, jobids, sub_where.c_str());
+           sub_join.c_str(), username, jobids, sub_where);
 
    } else {
       Mmsg(query,
       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
       " WHERE JobId IN (%s) %s",
-           sub_join.c_str(), jobids, sub_where.c_str());
+           sub_join.c_str(), jobids, sub_where);
    }
-
    db_list_ctx ctx;
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
    db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
    pm_strcpy(jobids, ctx.list);
+   free_pool_memory(sub_where);
    return ctx.count;
 }
 
@@ -342,7 +356,7 @@ static void build_path_hierarchy(JCR *jcr, BDB *mdb,
    char pathid[50];
    ATTR_DBR parent;
    char *bkp = mdb->path;
-   strncpy(pathid, org_pathid, sizeof(pathid));
+   bstrncpy(pathid, org_pathid, sizeof(pathid));
 
    /* Does the ppathid exist for this ? we use a memory cache...  In order to
     * avoid the full loop, we consider that if a dir is allready in the
@@ -586,38 +600,22 @@ void Bvfs::fv_update_cache()
    db->bdb_unlock();
 }
 
-/* 
- * Find an store the filename descriptor for empty directories Filename.Name=''
- */
-DBId_t Bvfs::get_dir_filenameid()
-{
-   uint32_t id;
-   if (dir_filenameid) {
-      return dir_filenameid;
-   }
-   Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
-   db_sql_query(db, db->cmd, db_int_handler, &id);
-   dir_filenameid = id;
-   return dir_filenameid;
-}
-
 /* Not yet working */
 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
 {
-   Mmsg(db->cmd, 
-        "SELECT FilenameId AS filenameid, Name AS name, size "
+   Mmsg(db->cmd,
+        "SELECT S.Filename AS filename, S.size "
           "FROM ( "
-         "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
+         "SELECT Filename, base64_decode_lstat(8,LStat) AS size "
            "FROM File "
           "WHERE PathId  = %lld "
             "AND JobId = %s "
-        ") AS S INNER JOIN Filename USING (FilenameId) "
+        ") AS S "
      "WHERE S.size > %lld "
      "ORDER BY S.size DESC "
      "LIMIT %d ", pathid, jobids, min_size, limit);
 }
 
-
 /* Get the current path size and files count */
 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
 {
@@ -858,6 +856,16 @@ bool Bvfs::ch_dir(DBId_t pathid)
 {
    reset_offset();
 
+   if (need_to_check_permissions()) {
+      sellist sel;
+      db_list_ctx ids;
+      char ed1[50];
+      sel.set_string(edit_uint64(pathid, ed1), false);
+      if (check_full_path_access(1, &sel, &ids) > 0) {
+         Dmsg1(DT_BVFS, "Access denied for pathid %d\n", (int)pathid);
+         pathid = 0;
+      }
+   }
    pwd_id = pathid;
    return pwd_id != 0;
 }
@@ -881,7 +889,7 @@ bool Bvfs::ch_dir(const char *path)
 void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, alist *clients)
 {
    char ed1[50], ed2[50], *eclients;
-   POOL_MEM q, query;
+   POOL_MEM fname, q, query;
 
    if (see_copies) {
       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
@@ -889,20 +897,28 @@ void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, alist *clients)
       Mmsg(q, " AND Job.Type = 'B' ");
    }
 
+   if (*filename && fnid == 0) {
+      Mmsg(fname, " '%s' ", filename);
+
+   } else {
+      Mmsg(fname, " (SELECT Filename FROM File AS F2 WHERE FileId = %s) ",
+           edit_uint64(fnid, ed1));
+   }
+
    eclients = escape_list(clients);
 
    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
          (uint64_t)fnid, eclients);
 
-   Mmsg(query,//    1           2           3      4 
-"SELECT 'V', File.PathId, File.FilenameId,  0, File.JobId, "
-//            5           6          7
-        "File.LStat, File.FileId, File.Md5, "
-//         8                    9
-        "Media.VolumeName, Media.InChanger "
+   Mmsg(query,//    1           2           3
+"SELECT DISTINCT 'V', File.PathId, File.FileId,  File.JobId, "
+//         4          5           6
+        "File.LStat, File.FileId, File.Md5,  "
+//         7                    8
+       "Media.VolumeName, Media.InChanger "
 "FROM File, Job, Client, JobMedia, Media "
-"WHERE File.FilenameId = %s "
-  "AND File.PathId=%s "
+"WHERE File.Filename = %s "
+  "AND File.PathId = %s "
   "AND File.JobId = Job.JobId "
   "AND Job.JobId = JobMedia.JobId "
   "AND File.FileIndex >= JobMedia.FirstIndex "
@@ -911,7 +927,7 @@ void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, alist *clients)
   "AND Job.ClientId = Client.ClientId "
   "AND Client.Name IN (%s) "
   "%s ORDER BY FileId LIMIT %d OFFSET %d"
-        ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), eclients, q.c_str(),
+        ,fname.c_str(), edit_uint64(pathid, ed2), eclients, q.c_str(),
         limit, offset);
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
    db->bdb_sql_query(query.c_str(), list_entries, user_data);
@@ -938,9 +954,9 @@ bool Bvfs::get_delta(FileId_t fileid)
     * what are dependencies
     */
    Mmsg(query,
-        "SELECT F.JobId, FN.Name, F.PathId, F.DeltaSeq "
-        "FROM File AS F, Filename AS FN WHERE FileId = %lld "
-        "AND FN.FilenameId = F.FilenameId AND DeltaSeq > 0", fileid);
+        "SELECT F.JobId, F.Filename, F.PathId, F.DeltaSeq "
+        "FROM File AS F WHERE FileId = %lld "
+        "AND DeltaSeq > 0", fileid);
 
    if (!db->QueryDB(jcr, query.c_str())) {
       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
@@ -1064,8 +1080,9 @@ int Bvfs::_handle_path(void *ctx, int fields, char **row)
       /* can have the same path 2 times */
       if (strcmp(row[BVFS_PathId], prev_dir)) {
          pm_strcpy(prev_dir, row[BVFS_PathId]);
-         if (strcmp(NPRTB(row[BVFS_FileIndex]), "0") == 0 &&
-             strcmp(NPRTB(row[BVFS_FileId]), "0") != 0)
+         if (strcmp(NPRTB(row[BVFS_FileIndex]), "") != 0 && /* Can be negative, empty, or positive */
+             str_to_int64(row[BVFS_FileIndex])      <= 0 && /* Was deleted */
+             strcmp(NPRTB(row[BVFS_FileId]), "0")   != 0)   /* We have a fileid */
          {
             /* The directory was probably deleted */
             return 0;
@@ -1082,13 +1099,10 @@ int Bvfs::_handle_path(void *ctx, int fields, char **row)
 void Bvfs::ls_special_dirs()
 {
    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
-   char ed1[50], ed2[50];
+   char ed1[50];
    if (*jobids == 0) {
       return;
    }
-   if (!dir_filenameid) {
-      get_dir_filenameid();
-   }
 
    /* Will fetch directories  */
    *prev_dir = 0;
@@ -1104,19 +1118,19 @@ void Bvfs::ls_special_dirs()
         edit_uint64(pwd_id, ed1), jobids, ed1);
 
    POOL_MEM query2;
-   Mmsg(query2,// 1      2     3        4     5       6
-"SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId, FileIndex "
+   Mmsg(query2,// 1           2     3        4     5       6
+"SELECT 'D', tmp.PathId, tmp.Path, JobId, LStat, FileId, FileIndex "
   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
               "File1.LStat AS LStat, File1.FileId AS FileId, "
               "File1.FileIndex AS FileIndex, "
               "Job1.JobTDate AS JobTDate "
-      "FROM File AS File1 JOIN Job AS Job1 USING (JobId)"
-       "WHERE File1.FilenameId = %s "
+        "FROM File AS File1 JOIN Job AS Job1 USING (JobId)"
+       "WHERE File1.Filename = '' "
        "AND File1.JobId IN (%s)) AS listfile1 "
   "ON (tmp.PathId = listfile1.PathId) "
   "ORDER BY tmp.Path, JobTDate DESC ",
-        query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
+        query.c_str(), jobids);
 
    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
    db->bdb_sql_query(query2.c_str(), path_handler, this);
@@ -1126,7 +1140,7 @@ void Bvfs::ls_special_dirs()
 bool Bvfs::ls_dirs()
 {
    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
-   char ed1[50], ed2[50];
+   char ed1[50];
    if (*jobids == 0) {
       return false;
    }
@@ -1139,22 +1153,16 @@ bool Bvfs::ls_dirs()
 
    }
 
-   if (!dir_filenameid) {
-      get_dir_filenameid();
-   }
 
    /* the sql query displays same directory multiple time, take the first one */
    *prev_dir = 0;
 
    /* Let's retrieve the list of the visible dirs in this dir ...
-    * First, I need the empty filenameid to locate efficiently
-    * the dirs in the file table
-    * my $dir_filenameid = $self->get_dir_filenameid();
     */
    /* Then we get all the dir entries from File ... */
    Mmsg(query,
-//       0     1      2      3      4     5       6
-"SELECT 'D', PathId,  0,    Path, JobId, LStat, FileId, FileIndex FROM ( "
+//       0     1      2      3      4     5         6
+"SELECT 'D', PathId, Path, JobId, LStat, FileId, FileIndex FROM ( "
     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
            "lower(Path1.Path) AS lpath, "
            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
@@ -1178,15 +1186,14 @@ bool Bvfs::ls_dirs()
        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
               "File1.LStat AS LStat, File1.FileId AS FileId, "
               "File1.FileIndex, Job1.JobTDate AS JobTDate "
-     "FROM File AS File1 JOIN Job AS Job1 USING (JobId) "
-       "WHERE File1.FilenameId = %s "
-       "AND File1.JobId IN (%s)) AS listfile1 "
-       "ON (listpath1.PathId = listfile1.PathId) "
+       "FROM File AS File1 JOIN Job AS Job1 USING (JobId) "
+        "WHERE File1.Filename = '' "
+         "AND File1.JobId IN (%s)) AS listfile1 "
+    "ON (listpath1.PathId = listfile1.PathId) "
     ") AS A ORDER BY Path,JobTDate DESC LIMIT %d OFFSET %d",
         edit_uint64(pwd_id, ed1),
         jobids,
         filter.c_str(),
-        edit_uint64(dir_filenameid, ed2),
         jobids,
         limit, offset);
 
@@ -1228,22 +1235,27 @@ bool Bvfs::ls_files()
    }
 
    if (!pwd_id) {
-      ch_dir(get_root());
+      if (!ch_dir(get_root())) {
+         return false;
+      }
    }
 
    edit_uint64(pwd_id, pathid);
    if (*pattern) {
-      Mmsg(filter, " AND Filename.Name %s '%s' ", 
-           match_query[db_get_type_index(db)], pattern);
+      Mmsg(filter, " AND T.Filename %s '%s' ",
+           match_query[db->bdb_get_type_index()], pattern);
 
    } else if (*filename) {
-      Mmsg(filter, " AND Filename.Name = '%s' ", filename);
+      Mmsg(filter, " AND T.Filename = '%s' ", filename);
    }
 
    build_ls_files_query(db, query,
                         jobids, pathid, filter.c_str(),
                         limit, offset);
 
+   /* TODO: check all parent directories to know if we can
+    * list the files here.
+    */
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
 
    db->bdb_lock();
@@ -1313,8 +1325,15 @@ void Bvfs::clear_cache()
 {
    db->bdb_sql_query("BEGIN",                     NULL, NULL);
    db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
-   db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
-   db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
+   if (db->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
+      // sqlite3 don't have the "TRUNCATE TABLE" command but optimize the
+      // "DELETE3 one when the "WHERE" clause is not used
+      db->bdb_sql_query("DELETE FROM PathHierarchy",    NULL, NULL);
+      db->bdb_sql_query("DELETE FROM PathVisibility",   NULL, NULL);
+   } else {
+      db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
+      db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
+   }
    db->bdb_sql_query("COMMIT",                    NULL, NULL);
 }
 
@@ -1329,26 +1348,138 @@ bool Bvfs::drop_restore_list(char *output_table)
    return false;
 }
 
-bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
-                                char *output_table)
+struct hardlink {
+   hlink lnk;
+   uint32_t jobid;
+   int32_t  fileindex;
+};
+
+static int checkhardlinks_handler(void *ctx, int fields, char **row)
+{
+   Bvfs *self = (Bvfs *)ctx;
+   return self->checkhardlinks_cb(fields, row);
+}
+
+/* Callback to check hardlinks for a given file */
+int Bvfs::checkhardlinks_cb(int fields, char **row)
+{
+   int32_t LinkFI=-1;
+   struct stat statp;
+   memset(&statp, 0, sizeof(statp));
+
+   if (row[2] && row[2][0]) {
+      decode_stat(row[2], &statp, sizeof(statp), &LinkFI);
+   }
+   if (statp.st_nlink > 1) {
+      hardlink *hl = NULL;
+      uint32_t jobid = str_to_uint64(row[1]);
+      uint64_t key = (((uint64_t)jobid) << 32) | (uint64_t)LinkFI;
+
+      /* Look in our list if the original file is here, or add it */
+      if (LinkFI == 0) {
+         /* Keep the first item in our list */
+         hl = (hardlink *) hardlinks->hash_malloc(sizeof(hardlink));
+
+      } else if (LinkFI > 0) {
+         if (hardlinks->lookup(key) == NULL) {
+            hl = (hardlink *)hardlinks->hash_malloc(sizeof(hardlink));
+            hl->jobid = jobid;
+            hl->fileindex = LinkFI;
+            missing_hardlinks->append(hl);
+         }
+      }
+      if (hl) {
+         hardlinks->insert(key, hl);
+      }
+   }
+   return 0;
+}
+
+bool Bvfs::insert_hardlinks(char *output_table)
+{
+   /* Check hardlinks. We get LStat fields, and we check if we need to add a FileIndex */
+   bool ret=false, first=true;
+   hardlink *hl = NULL;
+   POOL_MEM query, tmp1, tmp2;
+   int nb=0;
+
+   hardlinks = New(htable(hl, &hl->lnk));
+   missing_hardlinks = New(alist(100, not_owned_by_alist));
+
+   Mmsg(query, "SELECT T.FileId, T.JobId, File.LStat FROM %s AS T JOIN File USING (FileId) WHERE Filename <> '' ORDER By T.JobId, T.FileIndex ASC", output_table);
+   if (!db->bdb_sql_query(query.c_str(), checkhardlinks_handler, this)) {
+      Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
+      goto bail_out; 
+   }
+
+   Dmsg1(dbglevel, "Inserting %d hardlink records\n", missing_hardlinks->size());
+   foreach_alist(hl, missing_hardlinks) {
+      if (first) {
+         first=false;
+      } else {
+         pm_strcat(tmp2, " OR ");
+      }
+      Mmsg(tmp1, "(File.JobId=%ld AND File.FileIndex=%ld)", hl->jobid, hl->fileindex);
+      pm_strcat(tmp2, tmp1.c_str());
+
+      if (nb++ >= 500) {
+         /* flush the current querry */
+         Dmsg1(dbglevel, "  Inserting %d hardlinks\n", nb - 1);
+         Mmsg(query, "INSERT INTO %s (JobId, FileIndex, FileId) "
+              "SELECT JobId, FileIndex, FileId FROM File WHERE %s",
+              output_table, tmp2.c_str());
+
+         if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+            Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
+            goto bail_out;
+         }
+
+         pm_strcpy(tmp2, "");   /* Start again the concat */
+         first=true;
+         nb=0;
+      }
+   }
+   if (first) {
+      ret = true;
+      goto bail_out;
+   }
+   /* To the last round of the insertion */
+   Mmsg(query, "INSERT INTO %s (JobId, FileIndex, FileId) "
+        "SELECT JobId, FileIndex, FileId FROM File WHERE %s",
+        output_table, tmp2.c_str());
+
+   if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+      Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
+      goto bail_out; 
+   }
+   ret = true;
+
+bail_out:
+   delete missing_hardlinks;
+   missing_hardlinks = NULL;
+   delete hardlinks;
+   hardlinks = NULL;
+   return ret;
+}
+
+bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *output_table)
 {
    POOL_MEM query;
    POOL_MEM tmp, tmp2;
-   int64_t id, jobid, prev_jobid;
+   int64_t id;
+   uint32_t nb = 0;
    int num;
    bool init=false;
    bool ret=false;
    /* check args */
    if ((*fileid   && !is_a_number_list(fileid))  ||
        (*dirid    && !is_a_number_list(dirid))   ||
-       (*hardlink && !is_a_number_list(hardlink))||
-       (!*hardlink && !*fileid && !*dirid && !*hardlink))
+       (!*fileid && !*dirid)|| (!output_table)
+      )
    {
-      Dmsg0(dbglevel, "ERROR: One or more of FileId, DirId or HardLink is not given or not a number.\n");
       return false;
    }
    if (!check_temp(output_table)) {
-      Dmsg0(dbglevel, "ERROR: Wrong format for table name (in path field).\n");
       return false;
    }
 
@@ -1361,14 +1492,16 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
    Mmsg(query, "DROP TABLE %s", output_table);
    db->bdb_sql_query(query.c_str());
 
+   /* Now start the transaction to protect ourself from other commands */
+   db->bdb_start_transaction(jcr);
+
    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
 
    if (*fileid) {               /* Select files with their direct id */
       init=true;
-      Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
+      Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, Filename, "
                       "PathId, FileId "
-                 "FROM File,Job WHERE Job.JobId=File.Jobid "
-                 "AND FileId IN (%s)",
+                 "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
            fileid);
       pm_strcat(query, tmp.c_str());
    }
@@ -1378,13 +1511,12 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
 
       if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
-         Dmsg3(dbglevel, "ERROR: Path not found %lld q=%s s=%s\n",
-               id, tmp.c_str(), tmp2.c_str());
+         Dmsg0(dbglevel, "Can't search for path\n");
          /* print error */
          goto bail_out;
       }
       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
-         Dmsg3(dbglevel, "ERROR: Path not found %lld q=%s s=%s\n",
+         Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
                id, tmp.c_str(), tmp2.c_str());
          break;
       }
@@ -1410,7 +1542,7 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
          query.strcat(" UNION ");
       }
 
-      Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
+      Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.Filename, "
                         "File.PathId, FileId "
                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
                   "WHERE Path.Path LIKE '%s' ESCAPE '%s' AND File.JobId IN (%s) ",
@@ -1422,54 +1554,19 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
 
       /* A directory can have files from a BaseJob */
       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
-                        "File.FilenameId, File.PathId, BaseFiles.FileId "
+                        "File.Filename, File.PathId, BaseFiles.FileId "
                    "FROM BaseFiles "
                         "JOIN File USING (FileId) "
                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
                         "JOIN Path USING (PathId) "
-                  "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ",
-           tmp2.c_str(), jobids);
-      query.strcat(tmp.c_str());
-   }
-
-   /* expect jobid,fileindex */
-   prev_jobid=0;
-   while (get_next_id_from_list(&hardlink, &jobid) == 1) {
-      if (get_next_id_from_list(&hardlink, &id) != 1) {
-         Dmsg0(dbglevel, "ERROR: hardlink should be two by two\n");
-         goto bail_out;
-      }
-      if (jobid != prev_jobid) { /* new job */
-         if (prev_jobid == 0) {  /* first jobid */
-            if (init) {
-               query.strcat(" UNION ");
-            }
-         } else {               /* end last job, start new one */
-            tmp.strcat(") UNION ");
-            query.strcat(tmp.c_str());
-         }
-         Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
-                            "PathId, FileId "
-                       "FROM File JOIN Job USING (JobId) WHERE JobId = %lld "
-                        "AND FileIndex IN (%lld", jobid, id);
-         prev_jobid = jobid;
-
-      } else {                  /* same job, add new findex */
-         Mmsg(tmp2, ", %lld", id);
-         tmp.strcat(tmp2.c_str());
-      }
-   }
-
-   if (prev_jobid != 0) {       /* end last job */
-      tmp.strcat(") ");
+                  "WHERE Path.Path LIKE '%s' ESCAPE '%s' AND BaseFiles.JobId IN (%s) ",
+           tmp2.c_str(), escape_char_value[db->bdb_get_type_index()], jobids);
       query.strcat(tmp.c_str());
-      init = true;
    }
 
    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
-
    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
-      Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
+      Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
       goto bail_out;
    }
 
@@ -1479,7 +1576,7 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
    /* TODO: handle jobid filter */
    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
-      Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
+      Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
       goto bail_out;
    }
 
@@ -1489,60 +1586,104 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
            output_table, output_table);
       Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
       if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
-         Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
+         Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
          goto bail_out; 
       } 
    }
 
-   /* Check if some FileId have DeltaSeq > 0
-    * Foreach of them we need to get the accurate_job list, and compute
-    * what are dependencies
-    */
-   Mmsg(query, 
-        "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
-          "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
-         "WHERE DeltaSeq > 0", output_table);
+   /* SQLite needs extra indexes */
+   if (db->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
+      Mmsg(query, "CREATE INDEX idx1_%s ON %s (JobId)",
+           output_table, output_table);
+      Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
+      if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+         Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
+         goto bail_out; 
+      } 
 
-   if (!db->QueryDB(jcr, query.c_str())) {
-      Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
+      Mmsg(query, "CREATE INDEX idx2_%s ON %s (FileIndex)",
+           output_table, output_table);
+      Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
+      if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+         Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
+         goto bail_out; 
+      } 
    }
 
-   /* TODO: Use an other DB connection can avoid to copy the result of the
-    * previous query into a temporary buffer
-    */
-   num = db->sql_num_rows();
-   Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
+   if (compute_delta) {
+      /* Check if some FileId have DeltaSeq > 0
+       * Foreach of them we need to get the accurate_job list, and compute
+       * what are dependencies
+       */
+      Mmsg(query,
+        "SELECT F.FileId, F.JobId, F.Filename, F.PathId, F.DeltaSeq "
+          "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
+         "WHERE DeltaSeq > 0", output_table);
 
-   if (num > 0) {
-      int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
-      SQL_ROW row;
-      int i=0;
+      if (!db->QueryDB(jcr, query.c_str())) {
+         Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
+      }
 
-      while((row = db->sql_fetch_row())) {
-         result[i++] = str_to_int64(row[0]); /* FileId */
-         result[i++] = str_to_int64(row[1]); /* JobId */
-         result[i++] = str_to_int64(row[2]); /* FilenameId */
-         result[i++] = str_to_int64(row[3]); /* PathId */
-      } 
+      /* TODO: Use an other DB connection can avoid to copy the result of the
+       * previous query into a temporary buffer
+       */
+      num = db->sql_num_rows();
+      Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n",
+            num, query.c_str());
+
+      if (num > 0) {
+         int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
+         SQL_ROW row;
+         int i=0;
+
+         while((row = db->sql_fetch_row())) {
+            result[i++] = str_to_int64(row[0]);     /* FileId */
+            result[i++] = str_to_int64(row[1]);     /* JobId */
+            result[i++] = (intptr_t)bstrdup(row[2]); /* Filename TODO: use pool */
+            result[i++] = str_to_int64(row[3]);     /* PathId */
+         }
 
-      i=0;
-      while (num > 0) {
-         insert_missing_delta(output_table, result + i);
-         i += 4;
-         num--;
+         i=0;
+         while (num > 0) {
+            insert_missing_delta(output_table, result + i);
+            free((char *)(result[i+2]));       /* TODO: use pool */
+            i += 4;
+            num--;
+         }
+         free(result);
       }
-      free(result);
    }
 
-   ret = true;
+   if (!insert_hardlinks(output_table)) {
+      goto bail_out;
+   }
+
+   if (!check_permissions(output_table)) {
+      goto bail_out;
+   }
+
+   /* Final check to see if we have at least one file to restore */
+   Mmsg(query, "SELECT 1 FROM %s LIMIT 1", output_table);
+   if (!db->bdb_sql_query(query.c_str(), db_int_handler, &nb)) {
+      Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
+      goto bail_out; 
+   }
+   if (nb == 1) {
+      ret = true;
+   }
 
 bail_out:
+   if (!ret) {
+      Mmsg(query, "DROP TABLE %s", output_table);
+      db->bdb_sql_query(query.c_str());
+   }
    Mmsg(query, "DROP TABLE btemp%s", output_table);
-   db->bdb_sql_query(query.c_str(), NULL, NULL);
+   db->bdb_sql_query(query.c_str());
+   db->bdb_end_transaction(jcr); /* the destination table might be visible now */
    db->bdb_unlock();
    return ret;
 }
-
 void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
 {
    char ed1[50];
@@ -1601,4 +1742,33 @@ void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
    }
 }
 
+/* This function is used with caution by tools such as SAP plugin
+ * We need to be able to delete a record from the File table, quite ugly...
+ */
+bool Bvfs::delete_fileid(char *fileid)
+{
+   bool ret=true;
+   /* The jobid was filtered before */
+   if (!jobids || *jobids == '\0' || !fileid || *fileid == '\0') {
+      return false;
+   }
+   db->bdb_lock();
+   Mmsg(db->cmd, "DELETE FROM File WHERE FileId IN (%s) AND JobId IN (%s)", fileid, jobids);
+   if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
+      ret = false;
+   }
+   db->bdb_unlock();
+   return ret;
+}
+
+#if COMMUNITY
+bool Bvfs::check_path_access(DBId_t pid) { return true;}
+bool Bvfs::check_full_path_access(int count, sellist *sel, db_list_ctx *toexcl){return true;}
+bool Bvfs::can_access_dir(const char *path) { return true;}
+bool Bvfs::can_access(struct stat *p) { return true;}
+bool Bvfs::need_to_check_permissions() { return false;}
+bool Bvfs::check_permissions(char *output_table) {return true;}
+int Bvfs::checkuid_cb(int fields, char **row) { return 0;}
+#endif
+
 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */