]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
BEE Backport regress/scripts/blab.py
authorAlain Spineux <alain@baculasystems.com>
Fri, 4 Sep 2020 11:32:53 +0000 (13:32 +0200)
committerEric Bollengier <eric@baculasystems.com>
Thu, 24 Mar 2022 08:02:56 +0000 (09:02 +0100)
This commit is the result of the squash of the following main commits:

Author: Alain Spineux <alain@baculasystems.com>
Date:   Wed Oct 16 19:28:20 2019 +0200

    regress: add a Dedup resource and make async.sh rebuild the SD plugins

Author: Alain Spineux <alain@baculasystems.com>
Date:   Wed Feb 28 15:50:40 2018 +0100

    regress: tweak blab.py

Author: Alain Spineux <alain@baculasystems.com>
Date:   Thu Feb 1 16:53:37 2018 +0100

    regress: add lab.CheckConfig() to blab.py to check FORCE_XXXX

    - check if FORCE_XXXX are set and skip (exit 0) if not appropriate.
    - add lab.Die("your message")
    - add lab.Exit(exit_code, ["your message"])

Author: Alain Spineux <alain@baculasystems.com>
Date:   Mon Aug 14 18:55:49 2017 +0200

    regress: dedup-checkoptions-test & blab.py update for new vacuum logging

Author: Alain Spineux <alain@baculasystems.com>
Date:   Mon Aug 14 13:49:58 2017 +0200

    regress: tweak dedup

    - arun.ini update to include all dedup tests
    - blab.py handle new scrub/vacuum logging

Author: Alain Spineux <alain@baculasystems.com>
Date:   Tue Aug 8 16:34:57 2017 +0200

    regress: improve tests/dedup-scrub-test.py, blap.py and querry_dde.py

    - dedup-scrub-test.py handle fake index entries
    - query_dde.py handle forged addresses
    - blap.py more definitions

Author: Alain Spineux <alain@baculasystems.com>
Date:   Mon Jul 10 13:41:30 2017 +0200

    regress: blab.py add minimal support for scrubber

    - add EZThread.AsyncStop0
    - add Dedupengine.GetContainerPath()
    - add minimal support for scrubber

Author: Alain Spineux <alain@baculasystems.com>
Date:   Tue May 30 16:32:21 2017 +0200

    regress: export QUERY_DDE_ADVANCED variable in blab.py

Author: Alain Spineux <alain@baculasystems.com>
Date:   Fri Apr 28 14:19:52 2017 +0200

    regress: adjust all .py scripts to blab.py and vacuum-ng

Author: Alain Spineux <alain@baculasystems.com>
Date:   Fri Apr 28 14:13:59 2017 +0200

    regress: improve and adjust blab.py for vacuum-ng

    - adjust blab.py for vacuum-ng output
    - kill -USR1 generate a stacktrace of running python thread
    - all .py scripts support --no-cleanup, --debug and --verbose options
    - support for 'dedup-simple' and 'dedup-autochanger' profiles
    - new blab.StartTest() and EndTest()
    - blab.GetJob(<negative value>)
    - blab.Log(logging.CTRITICAL, ) force a exit

Author: Alain Spineux <alain@baculasystems.com>
Date:   Mon Apr 24 14:57:34 2017 +0200

    regress: blab.py based on copy-dedup-confs instead of copy-plugin-confs

    - use DiskChanger instead (STORAGE=DiskChanger)
    - new EZThread base class for thread
    - add --no-cleanup PART 1
    - GetJob(jobid) and jobid=-1 instead of  GetJob(pos)
    - handle number like "12,345,567" with ','

Author: Alain Spineux <alain@baculasystems.com>
Date:   Fri Apr 21 15:03:22 2017 +0200

    regress: blab.py add ListMedia()

Author: Alain Spineux <alain@baculasystems.com>
Date:   Thu Apr 6 21:56:38 2017 +0200

    regress: blab.py add ListJobs() and ZapContainers()

    - ListJobs() return list jobs
    - GetJob(pos) return one job (use -1)
    - ZapContainers() fill all containers full of zeroes

Author: Alain Spineux <alain@baculasystems.com>
Date:   Fri Mar 24 14:49:12 2017 +0100

    blab.py parse dedup usage and more vacuum

    - parse dedup usage
    - parse vacuum "need optimize"
    - add some function helper to DDE

Author: Alain Spineux <alain@baculasystems.com>
Date:   Fri Dec 2 16:10:57 2016 +0100

    blab.py add BconsoleScriptOut()

    - BconsoleScriptOut() decode stdout
    - Add GetVar()
    - handle "exit" in the Shell

Author: Alain Spineux <alain@baculasystems.com>
Date:   Thu Nov 10 09:26:06 2016 +0100

    regress: improve blab.py

    - does stdout & stderr passthrough instead of active forwardin
    - accurate detection of the end of the shell commands
    - delete /tmp/std{out,err}XXXX using atexit

Author: Alain Spineux <alain@baculasystems.com>
Date:   Fri Oct 21 16:11:47 2016 +0200

    regress: make regress python friendly

    - modules like bconsole.py and blab.py goes in regress/scrips
    - regress/tests/regress.py is a helper that load modules in scripts
      without boring with librady path
    - py-sample-test.py is the first sample
    - add new objects lab.{fd,sd,dir} that are manage the related Daemon
    - add lab.sd.dde of class DDE that provides some basic helper for the DDE
    - file FileReader can search a log  or trace file
    - lab.GetVolume(volname) return information about a volume

regress/scripts/blab.py [new file with mode: 0755]

diff --git a/regress/scripts/blab.py b/regress/scripts/blab.py
new file mode 100755 (executable)
index 0000000..8580e76
--- /dev/null
@@ -0,0 +1,872 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+#   Bacula(R) - The Network Backup Solution
+
+   Copyright (C) 2000-2020 Kern Sibbald
+
+   The original author of Bacula is Kern Sibbald, with contributions
+   from many others, a complete list can be found in the file AUTHORS.
+
+   You may use this file and others of this release according to the
+   license defined in the LICENSE file, which includes the Affero General
+   Public License, v3.0 ("AGPLv3") and some additional permissions and
+   terms pursuant to its AGPLv3 Section 7.
+
+   This notice must be preserved when any source code is
+   conveyed and/or propagated.
+
+   Bacula(R) is a registered trademark of Kern Sibbald.
+#
+# author: alain@baculasystems.com
+
+import os
+import sys
+import argparse
+import subprocess
+import threading
+import hashlib
+import time
+import queue
+import codecs
+import logging
+import warnings
+import stat
+import re
+import struct
+import atexit
+
+import bconsole
+
+if True:
+    import code, traceback, signal
+
+    def debug(sig, frame):
+        """Interrupt running process, and provide a python prompt for
+        interactive debugging."""
+        d={'_frame':frame}         # Allow access to frame object.
+        d.update(frame.f_globals)  # Unless shadowed by global
+        d.update(frame.f_locals)
+
+        i = code.InteractiveConsole(d)
+        message  = "Signal received : entering python shell.\nTraceback:\n"
+        message += ''.join(traceback.format_stack(frame))
+        i.interact(message)
+
+    def dumpstacks(signal, frame):
+        id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
+        code = []
+        for threadId, stack in sys._current_frames().items():
+            if id2name.get(threadId,"").startswith('pydevd.'):
+                continue
+            code.append("\n# %d Thread: %s(%d)" % (os.getpid(), id2name.get(threadId,""), threadId))
+            for filename, lineno, name, line in traceback.extract_stack(stack):
+                code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
+                if line:
+                    code.append("  %s" % (line.strip()))
+        print("\n".join(code), file=sys.stderr)
+
+    def listen():
+        signal.signal(signal.SIGUSR1, dumpstacks)  # Register handler
+
+    listen()
+
+def atexit_delete_file(paths):
+    for path in paths:
+        try:
+            os.unlink(path)
+        except FileNotFoundError:
+            pass
+
+class Obj:
+    pass
+
+class DDE:
+    BUCKETID_SHIFT=(64-16)
+    BUCKETID_MASK=(0x7FFF<<BUCKETID_SHIFT)
+    BUCKETIDX_MASK=(0xFFFF<<BUCKETID_SHIFT)
+
+    CONTAINER_HEADER_SIZE=4096
+
+    BLOCK_HEAD_COMPRESSED=(1<<31)
+    BLOCK_HEAD_NOT_TRIED_COMPRESS=(1<<30)
+    BLOCK_HEAD_SIZE_MASK=0x00FFFFFF
+    BLOCK_HEAD_SIZE=4
+
+    BUCKET_BAD_ADDR=0x0
+    BUCKET_RAW_DATA=0x1 # data is stored inside the ref
+    BUCKET_MISS_ADDR=0x2
+
+    daemon_head='(?P<daemon>[^:]+): (?P<source>[^:]+):(?P<source_lineno>\d+)\s+'
+    regex=Obj()
+    regex.vacuum_start=re.compile(daemon_head+'Vacuum: (?P<start>\d{2}-\w{3}-\d{4} \d{2}:\d{2}:\d{2}) Start forceoptimize=(?P<forceoptimize>\d+) holepunching=(?P<holepunching>\d+) checkindex=(?P<checkindex>\d+) checkvolume=(?P<checkvolume>\d+)')
+    regex.vacuum_end=re.compile(daemon_head+'Vacuum: (?P<end>\d{2}-\w{3}-\d{4} \d{2}:\d{2}:\d{2}) End\s+-+')
+    regex.vacuum_bnum_min=re.compile(daemon_head+'Vacuum: bnum_min=(?P<bnum_min>\d+) bnum_max=(?P<bnum_max>\d+) mlock_max=(?P<mlock_max>\d+) mlock_strategy=(?P<mlock_strategy>\d+)')
+    regex.vacuum_hole_size=re.compile(daemon_head+'Vacuum: hole_size=(?P<hole_size>\d+)')
+    regex.vacuum_hash_count=re.compile(daemon_head+'Vacuum: hash_count=(?P<hash_count>\d+)/(?P<bnum>\d+) chunk_count=(?P<chunk_count>\d+) connections=(?P<connections>\d+)')
+    regex.vacuum_preload=re.compile(daemon_head+'Vacuum: preload orphan count=(?P<preload_orphan>\d+) from (?P<preload_filename>\d+)')
+    regex.vacuum_volumes=re.compile(daemon_head+'Vacuum: (?P<end_volume>\d{2}-\w{3}-\d{4} \d{2}:\d{2}:\d{2}) Number of volumes handled (?P<volume>\d+)/(?P<tot_volume>\d+) suspect_ref=(?P<suspect_ref_all_vols>\d+)')
+    regex.vacuumbadref=re.compile(daemon_head+'VacuumBadRef (?P<type>\S+) FI=(?P<fi>\d+) SessId=(?P<sessid>\d+) SessTime=(?P<sesstime>\d+)( : ref\(\#(?P<hash>[0-9A-Fa-f]+) addr=(?P<addr>0x[0-9A-Fa-f]+) size=(?P<size>\d+)\) (?P<extra>.*)| : size=(?P<refsize>\d+)|)')
+    regex.vacuum_optimize=re.compile(daemon_head+'Vacuum: (?P<optimize>\d{2}-\w{3}-\d{4} \d{2}:\d{2}:\d{2}) optimize_index count=(?P<optimize_count>\d+) del=(?P<optimize_>\d+) add=(?P<optimize_add>\d+) optimize_err=(?P<optimize_err>\d+)')
+    regex.vacuum_need_optimize=re.compile(daemon_head+'Vacuum: need optimize, (?P<need_optimize>.*)')
+    regex.vacuum_ref_count=re.compile(daemon_head+'Vacuum: ref_count=(?P<ref_count>\d+) error=(?P<error>\d+) suspect_ref=(?P<suspect_ref>\d+)')
+    regex.vacuum_idxfix=re.compile(daemon_head+'Vacuum: idxfix=(?P<idxfix>\d+) 2miss=(?P<idxfix_2miss>\d+) orphan=(?P<idxfix_orphan>\d+) recoverable=(?P<recoverable>\d+)')
+    regex.vacuum_idxmiss=re.compile(daemon_head+'Vacuum: 2miss=(?P<idxmfix_2miss>\d+) orphan=(?P<idxfix_orphan>\d+)')
+    regex.vacuum_idxfix_err=re.compile(daemon_head+'Vacuum: idxupd_err=(?P<idxfix_err>\d+) chunk_read=(?P<chunk_read>\d+) chunk_read_err=(?P<chunk_read_err>\d+) chunkdb_err=(?P<chunkdb_err>\d+)')
+
+    vacuum_vars='error,suspect_ref,idxfix,idx_2miss,idxmiss_2miss,recoverable,orphan,suspect_ref,idxfix_err,chunk_read,chunk_read_err,chunk_write_err,chunkdb_err,idxmiss_2miss'.split(',')
+    orphan_struct=struct.Struct('!Q')
+
+    regex.dedup_usage=re.compile("""\
+Dedupengine status:\s+"(?P<dedupengine_name>.*)"
+\s+DDE:\s+hash_count=(?P<hash_count>\d+)\s+ref_count=(?P<ref_count>\d+)\s+ref_size=(?P<ref_size>\d+(\.\d*)?)\s(?P<ref_size_unit>[KMGTPEZY]?B)
+\s+ref_ratio=(?P<ref_ratio>\d+(\.\d*)?)\s+size_ratio=(?P<size_ratio>\d+(\.\d*)?)\s+dde_errors=(?P<dde_errors>\d+)
+\s+Config:\s+bnum=(?P<bnum>\d+)\s+bmin=(?P<bmin>\d+)\s+bmax=(?P<bmax>\d+)\s+mlock_strategy=(?P<mlock_strategy>\d+)\s+mlocked=(?P<mlocked_mb>\d+)MB\s+mlock_max=(?P<mlock_max_mb>\d+)MB(\\n\s+mlock_error="(?P<mlock_error>.*)")?
+\s+HolePunching:\s+hole_size=(?P<hole_size_kb>\d+)\s+KB
+\s+Containers:\s+chunk_allocated=(?P<chunk_allocated>\d+)\s+chunk_used=(?P<chunk_used>\d+)
+\s+disk_space_allocated=(?P<disk_space_allocated>\d+(\.\d*)?)\s+(?P<disk_space_allocated_unit>[KMGTPEZY]?B)\s+disk_space_used=(?P<disk_space_used>\d+(\.\d*)?)\s+(?P<disk_space_used_unit>[KMGTPEZY]?B)\s+containers_errors=(?P<containers_errors>\d+)
+\s+Vacuum:\s+last_run="(?P<vac_last_run>.*)"\s+duration=(?P<vac_duration>\d+)s\s+ref_count=(?P<vac_ref_count>\d+)\sref_size=(?P<vac_ref_size>\d+(\.\d*)?)\s+(?P<vac_ref_size_unit>[KMGTPEZY]?B)
+\s+vacuum_errors=(?P<vac_errors>\d+)\s+orphan_ref=(?P<vac_orphan_ref>\d+)\s+suspect_ref=(?P<vac_suspect_ref>\d+)\s+(progress=(?P<vac_progress>\d+)%%)?
+\s+Scrubber:\s+last_run="(?P<scrub_last_run>.*)".*
+\s+Stats:\s+read_chunk=(?P<read_chunk>\d+)\s+query_hash=(?P<query_hash>\d+)\s+new_hash=(?P<new_hash>\d+)\s+calc_hash=(?P<calc_hash>\d+)
+""", re.M)
+
+    def __init__(self, dedup_dir, dedup_index_dir):
+        self.dedup_dir=dedup_dir
+        self.dedup_index_dir=dedup_index_dir
+        self.archdir=self.dedup_index_dir
+        self.vacuum_orphan_lst_path=os.path.join(self.archdir, "orphanaddr.bin")
+
+    def GetDedupDir(self):
+        return self.dedup_dir
+
+    def GetMetaDir(self):
+        return self.dedup_index_dir
+
+    def GetFSMPath(self):
+        return os.path.join(self.dedup_index_dir, 'bee_dde.idx')
+
+    def GetIndexPath(self):
+        return os.path.join(self.dedup_index_dir, 'bee_dde.tch')
+
+    def SaveOrphan(self, orphans, appending=False):
+        f=open(self.vacuum_orphan_lst_path, 'ab' if appending else 'wb')
+        for orphan in orphans:
+            f.write(self.orphan_struct.pack(orphan))
+        f.close()
+
+    def ChunkAddr2ContainerId(addr):
+        return (addr & self.BUCKETID_MASK)>>self.BUCKETID_SHIFT
+
+    def ChunkAddr2Idx(addr):
+        return (addr & ~self.BUCKETIDX_MASK)
+
+    def CalcChunkAddr(self, containerid, containeridx):
+        return containerid<<self.BUCKETID_SHIFT | containeridx
+
+    def ZapContainers(self):
+        root_dir=self.GetDedupDir()
+        for filename in os.listdir(root_dir):
+            if filename.startswith('bee_dde') and filename.endswith('.blk'):
+                path=os.path.join(root_dir, filename)
+                filesize=os.path.getsize(path)
+                os.truncate(path, self.CONTAINER_HEADER_SIZE)
+                assert(os.path.getsize(path)==self.CONTAINER_HEADER_SIZE)
+                os.truncate(path, filesize)
+
+    def GetContainerPath(self, container_id):
+        root_dir=self.GetDedupDir()
+        filename='bee_dde%04d.blk' % (container_id,)
+        return os.path.join(root_dir, filename)
+
+    def ReadVacuum(self, tracefile, checkerr0=None, rewind=False):
+        vacuum_start=0
+        expected=dict()
+        values=dict()
+        start=tracefile.Tell()
+        found=tracefile.Search([ self.regex.vacuum_start, self.regex.vacuum_end,
+            self.regex.vacuum_bnum_min, self.regex.vacuum_hole_size, self.regex.vacuum_hash_count,
+            self.regex.vacuum_preload, self.regex.vacuum_volumes, self.regex.vacuum_ref_count,
+            self.regex.vacuum_idxfix, self.regex.vacuum_idxfix_err, self.regex.vacuum_optimize,
+            self.regex.vacuum_need_optimize, self.regex.vacuum_idxmiss, ])
+        val=dict()
+        err=dict()
+        if checkerr0!=None:
+            for k in self.vacuum_vars:
+                val[k]='0'
+            val.update(checkerr0)
+        for m in found:
+            values.update(m.groupdict())
+            if m.re==self.regex.vacuum_start:
+                vacuum_start+=1
+            for k in self.vacuum_vars:
+                try:
+                    v=m.group(k)
+                except IndexError:
+                    pass
+                else:
+                    if checkerr0!=None and val[k]!=None and v!=val[k]:
+                        err[k]=v
+        if rewind:
+            tracefile.Seek(start)
+        assert not err, "Vacuum error counter not to zero values : %r" % (err, )
+        assert vacuum_start==1 and 'start' in values and 'end' in values, 'count=%d start=%r end=%r' % (vacuum_start, 'start' in values, 'end' in values)
+        return values
+
+class FileReader:
+    def __init__(self, filename):
+        self.filename=filename
+        self.file=None
+        self.lineno=None
+
+    def Open(self):
+        if self.file==None:
+            try:
+                self.file=open(self.filename, 'rt')
+                self.lineno=0
+            except FileNotFoundError:
+                pass
+        return self.file
+
+    def Search(self, regex, pos=None):
+        """search the file for any line matching regex
+        if pos is None then continue searching from last search"""
+        found=[]
+        if self.Open():
+            if isinstance(regex, str):
+                regex=re.compile(regex)
+            if isinstance(regex, re._pattern_type):
+                regex=[regex, ]
+            assert isinstance(regex, list), "expect a list of regex"
+            if pos!=None:
+                self.lineno, p=pos
+                self.file.seek(p)
+            for line in self.file:
+                self.lineno+=1
+                line=line.rstrip('\n')
+                for r in regex:
+                    # print(line, r.pattern, r.match(line))
+                    match=r.match(line)
+                    if match:
+                        found.append(match)
+                        continue
+        return found
+
+    def Tell(self):
+        """return the current position in the file"""
+        return (self.lineno, self.file.tell()) if self.file else None
+
+    def Seek(self, pos):
+        if pos!=None and self.Open():
+            self.lineno, p=pos
+            self.file.seek(p)
+
+class Daemon:
+
+    short=None
+    def __init__(self, lab, name):
+        self.lab=lab
+        self.name=name
+        self.trace=FileReader(os.path.join(self.lab.vars.working, '%s.trace' % (self.name, )))
+
+    def Start(self):
+        self.lab.Shell('$bin/bacula-ctl-%s start' % (self.short, ))
+
+    def Stop(self):
+        self.lab.Shell('$bin/bacula-ctl-%s stop' % (self.short, ))
+
+    def Restart(self):
+        self.Stop()
+        self.Restart()
+
+class Storage(Daemon):
+    short='sd'
+    def __init__(self, lab, name):
+        super(Storage, self).__init__(lab, name)
+        self.dde=DDE(os.path.join(self.lab.vars.working, 'dde'), os.path.join(self.lab.vars.working, 'dde'))
+
+class Client(Daemon):
+    short='fd'
+    def __init__(self, lab, name):
+        super(Client, self).__init__(lab, name)
+
+class Director(Daemon):
+    short='dir'
+    def __init__(self, lab, name):
+        super(Director, self).__init__(lab, name)
+
+
+class Shell:
+
+    def __init__(self, shell='/bin/sh', verbose=True):
+        self.queue_out=queue.Queue()
+        self.stdout_verbose=False
+        self.proc=subprocess.Popen([ shell, ], stdin=subprocess.PIPE, stdout=None if verbose else subprocess.DEVNULL, stderr=None if verbose else subprocess.DEVNULL)
+        self.output=[ None, None, None ]
+        # ${tmp} is 'unknown' and not cleaned up yet, use '/tmp'
+        tmp='/tmp'
+        self.pipeout_name=os.path.join(tmp, 'stdout%d' % (os.getpid(), ))
+        self.pipeerr_name=os.path.join(tmp, 'stderr%d' % (os.getpid(), ))
+        atexit.register(atexit_delete_file, [self.pipeout_name, self.pipeerr_name])
+        for path in [self.pipeout_name, self.pipeerr_name]:
+            try:
+                os.unlink(path)
+            except FileNotFoundError:
+                pass
+            os.mkfifo(path)
+
+    def __del__(self):
+        for path in [self.pipeout_name, self.pipeerr_name]:
+            try:
+                os.unlink(path)
+            except FileNotFoundError:
+                pass
+
+    def Exec(self, cmd, verbose=False):
+        """run cmd in the shell and return exit code
+        cmd can be "unsafe" (see ExecOut())
+        guess the end of the process by running an echo into a PIPE
+        after the cmd.
+        if 'exit XX' is called then call sys.exit(XX)
+        """
+        cmd2='%s\n> %s echo $?\n' % (cmd.rstrip('\n'), self.pipeout_name)
+        self.proc.stdin.write(codecs.encode(cmd2))
+        self.proc.stdin.flush()
+
+        fd=os.open(self.pipeout_name, os.O_RDONLY|os.O_NONBLOCK)
+        buf=b''
+        while buf==b''and self.proc.poll()==None:
+            time.sleep(0.1)
+            buf=os.read(fd, 4096)
+        if buf==b'':
+            os.close(fd)
+            sys.exit(self.proc.returncode)
+        else:
+            data=b''
+            while buf:
+                data+=buf
+                buf=os.read(fd, 4096)
+            os.close(fd)
+        return int(data)
+
+    def ReadOut(self, pipe_name, out_idx):
+        fd=os.open(pipe_name, os.O_RDONLY)
+        data=b''
+        buf=os.read(fd, 4096)
+        while buf:
+            data+=buf
+            buf=os.read(fd, 4096)
+        os.close(fd)
+        self.output[out_idx]=data
+
+    def ExecOut(self, cmd):
+        """return [code, "stdout", "stderr"]
+        cmd must be a "safe" command that don't let any open background process
+        writing to stout or stderr. Use Exec() instead
+        """
+
+        # start 2 threads that read two fifos for stdout and stderr
+        self.output=[ None, None, None ]
+        thread_stdout=threading.Thread(target=self.ReadOut, args=(self.pipeout_name, 1))
+        thread_stdout.start()
+        thread_stderr=threading.Thread(target=self.ReadOut, args=(self.pipeerr_name, 2))
+        thread_stderr.start()
+        # redirect 
+        cmd2='(%s) >%s 2>%s\n' % (cmd.rstrip('\n'), self.pipeout_name, self.pipeerr_name)
+
+        self.proc.stdin.write(codecs.encode(cmd2))
+        self.proc.stdin.flush()
+        thread_stdout.join()
+        thread_stderr.join()
+
+        # get the error code
+        cmd2='> %s echo $?\n' % (self.pipeout_name, )
+        self.proc.stdin.write(codecs.encode(cmd2))
+        self.proc.stdin.flush()
+        fd=os.open(self.pipeout_name, os.O_RDONLY)
+        data=b''
+        buf=os.read(fd, 4096)
+        while buf:
+            data+=buf
+            buf=os.read(fd, 4096)
+        os.close(fd)
+        return int(data), codecs.decode(self.output[1].rstrip(b'\n')), codecs.decode(self.output[2])
+
+    def Close(self):
+        if self.proc:
+            self.proc.stdin.close()
+            self.proc.wait()
+            self.proc=None
+
+    def __del__(self):
+        self.Close()
+
+class Lab:
+    variables='cwd db_name db_user db_password working dumps bin FORCE_DEDUP' \
+              ' DEDUP_FS_OPTION DEDUP_FD_CACHE scripts conf rscripts tmp src' \
+              ' tmpsrc bperl CLIENT AUTOCHANGER_SCRIPT out'
+
+    DDE=DDE
+
+    def __init__(self, testname=None, profile='dedup-simple', profile_args=None, shell=False, cleanup=None, argv=None):
+        assert shell, "shell must be always enable while in mixed python/shell mode"
+        self.args = mainparser.parse_args(argv if argv else None)
+        self.cleanup=cleanup if cleanup!=None else self.args.cleanup
+
+        self.testname=testname if testname else os.path.basename(sys.argv[0])
+        self.profile=profile
+        self.profile_args=profile_args if profile_args else dict()
+        if shell==True:
+            shell='/bin/sh'
+        self.debug=not (os.getenv('REGRESS_DEBUG')!='1')
+        self.verbose=self.args.verbose or self.debug
+        if self.args.debug:
+            self.debug=True
+            self.verbose=True
+        self.shell=Shell(shell=shell, verbose=True)
+        atexit.register(self.AtExit)
+        self.shell.Exec('TestName="%s"' % (self.testname, ))
+        self.shell.Exec('BLAB_PY=1')
+        self.shell.Exec('. scripts/functions')
+        self.vars0=dict()
+        self.vars=Obj()
+        for var_name in self.variables.split():
+            _code, value, err=self.shell.ExecOut('echo ${%s}' % (var_name, ))
+            self.SetVar0(var_name, value)
+
+        if self.cleanup:
+            self.Cleanup()
+
+        self.SetupProfile()
+
+        self.ExportVar('QUERY_DDE_ADVANCED', '1') # enable advanced function for query_dde 
+        self.bconsole=bconsole.BConsole(regress=True)
+
+        self.dir=Director(self, '127.0.0.1-dir')
+        self.sd=Storage(self, '127.0.0.1-sd')
+        self.fd=Client(self, '127.0.0.1-fd')
+
+    def AtExit(self):
+        self.shell.Close()
+
+    def SetVar0(self, var_name, value):
+        self.vars0[var_name]=value
+        setattr(self.vars, var_name, value)
+
+    def SetVar(self, var_name, value):
+        self.SetVar0(var_name, value)
+        self.shell.Exec("%s='%s'" % (var_name, value))
+
+    def ExportVar(self, var_name, value):
+        self.SetVar0(var_name, value)
+        self.shell.Exec("export %s='%s'" % (var_name, value))
+
+    def GetVar(self, var_name, default=None):
+        return self.vars0.get(var_name, default)
+
+    def Shell(self, cmd):
+        return self.shell.Exec(cmd)
+
+    def ShellOut(self, cmd, codes=[ 0 ]):
+        c, out, err=self.shell.ExecOut(cmd)
+        if codes!=None and not c in codes:
+            raise RuntimeError('Unexpected return code %d for command "%s" out=%r err=%r' % (c, cmd, out, err))
+        return c, out, err
+
+    def BconsoleScript(self, script, expansion=True, verbose=None):
+        if expansion:
+            script=script.format(**self.vars0)
+        if verbose==None:
+            verbose=self.debug
+        returncode, out, err=self.bconsole.rawrun(script, verbose=verbose)
+        return returncode, out, err
+
+    def BconsoleScriptOut(self, script, expansion=True, verbose=None):
+        returncode, out, err=self.BconsoleScript(script, expansion, verbose)
+        return returncode, codecs.decode(out), codecs.decode(err)
+
+    def Cleanup(self):
+        return self.Shell('scripts/cleanup')
+
+    def CheckConfig(self, force, *args, **kwargs):
+        """check that FORCE_(ALIGNED|DEDUP|SDCALLS|CLOUD)"""
+        if force in ('FORCE_ALIGNED', 'FORCE_DEDUP', 'FORCE_SDCALLS', 'FORCE_CLOUD'):
+            if self.GetVar(force)!='yes':
+                self.Exit(0, " {}!=yes, skip test  {}".format(force, self.testname))
+            if force=='FORCE_DEDUP' and len(args)>0:
+                dedup_options=args[0]
+                if not isinstance(dedup_options, (list, tuple)):
+                    dedup_options=[dedup_options ]
+                dedup_option=self.GetVar('DEDUP_FS_OPTION')
+                if dedup_option not in dedup_options:
+                    self.Exit(0, "DEDUP_FS_OPTION don't match, skip test {}".format(self.testname))
+        else:
+            self.Die("Unknow config argument {}".format(force))
+
+    def StartTest(self):
+        if self.cleanup:
+            self.Shell('start_test')
+        else:
+            self.Shell('reset_test')
+        self.StartBacula()
+
+    def EndTest(self):
+        self.StopBacula()
+        self.Shell('end_test')
+        self.shell.Close()
+
+    def StartBacula(self):
+        open(os.path.join(self.vars.tmp, 'bconcmds'), 'w').write('quit\n')
+        self.Shell('run_bacula')
+
+    def StopBacula(self):
+        self.Shell('stop_bacula')
+
+    def MakePlugin(self):
+        warnings.warn("deprecated", DeprecationWarning)
+        self.MakePlugins('fd:test-dedup')
+
+    def MakePlugins(self, lst):
+        """expect a list of plugins like [ 'fd:test-dedup' ]""" 
+        if not isinstance(lst, list):
+            lst=[ lst ]
+        _code, pwd, _err=self.shell.ExecOut('pwd')
+        for pl in lst:
+            daemon, plugin=pl.split(':')
+            self.shell.Exec('cd ${cwd}/build/src/plugins/'+daemon)
+            self.shell.Exec('make')
+            self.shell.Exec('make install-'+plugin)
+        self.shell.Exec('cd '+pwd)
+
+    def GetVolume(self, volname):
+        volume=dict()
+        volume['name']=volname
+        volume['path']=os.path.join(self.vars.tmp, volname)
+        return volume
+
+    def s2usize(self, val, unit, comment=""):
+        units=dict(B=1, KB=1E3, MB=1E6, GB=1E9, TB=1E12, EB=1E15, ZB=1E18, YB=1E21)
+        val=val.translate({44:None})
+        try:
+            v=float(val)*units[unit]
+        except Exception as ex:
+            raise Exception('error conversing %r %r (%r)' % (val, unit, comment))
+        return v
+
+    def s2time(self, val, comment=""):
+        try:
+            v=val # TODO
+        except Exception as ex:
+            raise Exception('error conversing %r %r (%r)' % (val, unit, comment))
+        return v
+
+    def s2int(self, val):
+        return int(val.translate({44:None}))
+
+    def DedupUsage(self, storage=None):
+        if storage==None:
+            storage=self.GetVar('STORAGE')
+        returncode, out, err=self.BconsoleScriptOut('dedup usage storage=%s' % (storage,))
+        pos=out.find('Dedupengine status:')
+        self.Assert(pos!=-1, "dedup usage not found\n%s\n%s" % (out, err))
+        st=out[pos:]
+        match=DDE.regex.dedup_usage.match(st)
+        if not match:
+            """embeded debugging"""
+            regex=''
+            for line in DDE.regex.dedup_usage.pattern.split('\n'):
+                if regex:
+                    regex+='\n'+line
+                else:
+                    regex=line
+
+                self.Log(logging.ERROR, "line=%r", line)
+                if not re.match(regex, st, re.M):
+                    self.Log(logging.ERROR, "%s", st)
+                    self.Log(logging.ERROR, "dedup usage regex fail here:")
+                    self.Log(logging.ERROR, "%r", line)
+                    break
+
+        self.Assert(match, "dedup usage regex don't match:\n%s" % (st, ))
+        values=match.groupdict()
+        convert=dict(hash_count=int, ref_count=int, ref_size=self.s2usize,
+             ref_ratio=float, size_ratio=float, dde_errors=int,
+             bnum=int, bmin=int, bmax=int, mlock_strategy=int, mlocked_mb=int, mlock_max_mb=int,
+             hole_size_kb=int,
+             chunk_allocated=int, chunk_used=int,
+             disk_space_allocated=self.s2usize, disk_space_used=self.s2usize, containers_errors=int,
+             vac_last_run=self.s2time, vac_duration=int, vac_ref_count=int, vac_ref_size=self.s2usize,
+             vac_errors=int, vac_orphan_ref=int, vac_suspect_ref=int,
+             read_chunk=int, query_hash=int, new_hash=int, calc_hash=int)
+        for k in convert:
+            if convert[k]==self.s2usize:
+                values[k]=self.s2usize(values[k], values[k+'_unit'])
+            else:
+                if False and k in ('bmin', 'bmax'):
+                    self.Log(logging.INFO, '%s=%r -> %r', k, values[k], convert[k](values[k]))
+                values[k]=convert[k](values[k])
+
+        return values
+
+    def DedupSetMaximumContainerSize(self, mcs):
+        lab.Shell("""$bperl -e 'add_attribute("$conf/bacula-sd.conf", "MaximumContainerSize", "{}B", "Storage")'""".format(mcs))
+
+    def Assert(self, cond, message="Unexpected error"):
+        if not cond:
+            raise Exception(message)
+
+    def SetupProfile(self):
+        if self.profile not in ('dedup-simple', 'dedup-autochanger'):
+            self.Log(loggin.CRITICAL, "unknown profile: %s", self.profile)
+
+        if self.profile.startswith('dedup-'):
+            if 'MaximumContainerSize' in self.profile_args:
+                self.SetVar('DEDUP_MAXIMUM_CONTAINER_SIZE', '{}KB'.format(self.profile_args['MaximumContainerSize']//1024))
+            if self.cleanup:
+                self.SetupDedup()
+
+        if self.profile=='dedup-simple':
+            if self.cleanup:
+                self.Shell('scripts/copy-plugin-confs')
+            self.SetVar('STORAGE', 'File')
+        elif self.profile=='dedup-autochanger':
+            if self.cleanup:
+                self.Shell('scripts/copy-dedup-confs')
+            self.SetVar('STORAGE', 'DiskChanger')
+
+        if self.profile in ('dedup-simple', 'dedup-autochanger'):
+            self.SetVar('JobName', 'DedupPluginTest')
+            if self.cleanup:
+                self.Shell("""$bperl -e 'add_attribute("$conf/bacula-dir.conf", "Maximum Concurrent Jobs", "10", "Director")'""")
+                self.Shell("""$bperl -e 'add_attribute("$conf/bacula-dir.conf", "Maximum Concurrent Jobs", "10", "Client")'""")
+                self.Shell("""$bperl -e 'add_attribute("$conf/bacula-dir.conf", "Maximum Concurrent Jobs", "10", "Storage", "File")'""")
+                self.Shell("""$bperl -e 'add_attribute("$conf/bacula-dir.conf", "Maximum Concurrent Jobs", "10", "Job", "$JobName")'""")
+                self.Shell("""$bperl -e 'add_attribute("$conf/bacula-sd.conf", "Volume Poll Interval", "5s", "Device", "FileStorage")'""")
+                #self.Shell("""$bperl -e 'add_attribute("$conf/bacula-dir.conf", "Media Type", "File", "Storage", "File1")'""")
+                #self.Shell("""$bperl -e 'add_attribute("$conf/bacula-dir.conf", "Maximum Concurrent Jobs", "10", "Storage", "File1")'""")
+                #self.Shell("""$bperl -e 'add_attribute("$conf/bacula-sd.conf", "Media Type", "File", "Device", "FileStorage1")'""")
+
+        if self.profile.startswith('dedup-'):
+            self.ResetDedup()
+
+    def SetupDedup(self):
+        self.Shell('check_dedup_enable')
+        self.Shell('require_query_dde')
+        #self.MakePlugins([ 'fd:test-dedup', ])
+        # create 'select-cfg.sh' file to use in bconsole command like this :
+        # @exec "{tmp}/select-cfg.sh 0"
+        # run job=DedupPluginTest level=Full storage=File1 yes
+        # @exec "{tmp}/select-cfg.sh next"
+        f=open(os.path.join(self.vars.tmp, 'select-cfg.sh'), 'wt')
+        f.write("""\
+# The plugin unlink the file just after open()
+# echo "$$ `date` $@" >> /tmp/log
+while [ -f {tmp}/dedupstreams.conf ] ; do sleep 1; done
+for var in "$@" ; do
+   if [ "${{var}}" = "next" ] ; then
+      var=0
+      if [ -f {tmp}/dedupstreams.last ] ; then
+         var=`cat {tmp}/dedupstreams.last`
+         var=$(($var + 1))
+      fi
+   fi
+   echo {tmp}/stream${{var}}.dedup >> {tmp}/dedupstreams.conf
+   chmod a+x {tmp}/dedupstreams.conf # let the plugin does the unlink
+   echo ${{var}} > {tmp}/dedupstreams.last
+done
+# echo "$$ `date` done" >> /tmp/log
+""".format(tmp=self.vars.tmp))
+        os.fchmod(f.fileno(), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
+        f.close()
+
+        # create a default 'stream0.dedup' file
+        f=open(os.path.join(self.vars.tmp, 'stream0.dedup'), 'wt')
+        f.write("""\
+global_size=102400M
+chunk_min_size=3K
+chunk_max_size=64K
+deviation=10
+seed=1234
+size=128M
+start=-1
+""")
+        f.close()
+
+    def ResetDedup(self):
+        # cleanup 'dedupstreams.conf'
+        try:
+            os.unlink(os.path.join(self.vars.tmp, 'dedupstreams.conf'))
+        except FileNotFoundError:
+            pass
+
+    def ListMedia(self, type=None, status=None):
+        """typ & status can be list"""
+        convert=dict(MediaId=self.s2int, VolumeName=str, MediaType=str, VolBytes=self.s2int, \
+                     VolStatus=str, )
+        returncode, out, err=self.BconsoleScriptOut('llist media\n', verbose=False)
+        media=None
+        medias=dict()
+        skipping_garbadge=True
+        for line in out.splitlines(False):
+            if skipping_garbadge:
+                skipping_garbadge=not line.startswith('Using Catalog')
+                continue
+            line=line.strip()
+            if line:
+                try:
+                    k, v=line.split(':', 1)
+                except:
+                    if line not in ('No results to list.', 'You have messages.'):
+                        print(out)
+                        print("=> {!r}".format(line), file=sys.stderr)
+
+                try:
+                    conv=convert[k]
+                except:
+                    continue
+                val=conv(v.strip())
+                if media:
+                    setattr(media, k, val)
+                if k=='MediaId':
+                    media=Obj()
+                    media.MediaId=val
+                    medias[val]=media
+                elif k=='MediaType':
+                    if type!=None and val not in type and media.MediaId in medias:
+                        del medias[media.MediaId]
+                        continue
+                elif k=='VolStatus':
+                    if status!=None and val not in status and media.MediaId in medias:
+                        del medias[media.MediaId]
+                        continue
+            else:
+                media=None
+        return medias
+
+    def ListJobs(self, type=None, status=None):
+        """typ & status can be a string"""
+        convert=dict(JobId=self.s2int, Name=str, Type=str, Level=str, JobStatus=str, \
+                     JobErrors=self.s2int,)
+        returncode, out, err=self.BconsoleScriptOut('llist jobs\n', verbose=False)
+        job=None
+        jobs=dict()
+        skipping_garbadge=True
+        for line in out.splitlines(False):
+            if skipping_garbadge:
+                skipping_garbadge=not line.startswith('Using Catalog')
+                continue
+            line=line.strip()
+            if line:
+                try:
+                    k, v=line.split(':', 1)
+                except:
+                    if line not in ('No results to list.', 'You have messages.'):
+                        print(out)
+                        print("=> {!r}".format(line), file=sys.stderr)
+
+                try:
+                    conv=convert[k]
+                except:
+                    continue
+                val=conv(v.strip())
+                if job:
+                    setattr(job, k, val)
+                if k=='JobId':
+                    job=Obj()
+                    job.JobId=val
+                    jobs[val]=job
+                elif k=='Type':
+                    if type!=None and val not in type and job.JobId in jobs:
+                        del jobs[job.JobId]
+                        continue
+                elif k=='JobStatus':
+                    if status!=None and val not in status and job.JobId in jobs:
+                        del jobs[job.JobId]
+                        continue
+            else:
+                job=None
+        return jobs
+
+    def GetJob(self, jobid, type=None):
+        """use -1 for last jobid"""
+        jobs=self.ListJobs(type=type)
+        if not jobs:
+            return None
+        if jobid<0:
+            if jobid==-1:
+                jobid=max(jobs.keys())
+            else:
+                s=sorted(list(jobs.keys()))
+                jobid=s[jobid]
+        return jobs[jobid]
+
+    def AssertNoJobError(self, exclude=None, type=None):
+        """exclude is a list of job to that can be in error"""
+        jobs=self.ListJobs(type=type)
+        errors=[]
+        for jobid in jobs:
+            job=jobs[jobid]
+            if exclude!=None and jobid in exclude:
+                continue
+            if job.JobStatus not in 'T' or job.JobErrors!=0:
+                self.Log("job %d status=%s errors=%d", jobid, job.JobStatus, job.JobErrors)
+                errors.append(str(job.JobId))
+        self.Assert(len(errors)==0, "%d jobs have errors: %s" % (len(errors), ','.join(errors)))
+
+    def Log(self, lvl, msg, *args, **kwargs):
+        assert not kwargs, "dont handle this"
+        if lvl>logging.WARNING or self.debug or (lvl>=logging.INFO and self.verbose):
+            print(msg % args, file=sys.stderr)
+            sys.stderr.flush()
+            if lvl>=logging.CRITICAL:
+                sys.exit(1)
+
+    def Exit(self, code, msg=None):
+        if msg:
+            self.Log(logging.INFO, msg)
+        sys.exit(code)
+
+    def Die(self, msg=None):
+        if msg:
+            self.Log(logging.CRITICAL, msg)
+        else:
+            sys.exit(1)
+
+
+class EZThread(threading.Thread):
+    cont=True  # Continue
+    stopping=False
+
+    def AsyncStop0(self):
+        pass
+
+    def AsyncStop(self):
+        if not self.stopping:
+            self.AsyncStop0()
+            self.stopping=True
+            self.cont=False
+
+    def Join(self):
+        self.AsyncStop()
+        self.join()
+
+
+mainparser=argparse.ArgumentParser(description='Bacula regression test')
+mainparser.add_argument('--debug', action='store_true', help="idem REGRESS_DEBUG=1")
+mainparser.add_argument('--verbose', action='store_true', help="be verbose")
+mainparser.add_argument('--cleanup', action='store_true', help="don't reset data", default=True)
+mainparser.add_argument('--no-cleanup', dest='cleanup', action='store_false', help="don't reset data")
+
+if __name__ == "__main__":
+
+    s=Shell()
+    out=s.ExecOut('echo hello')
+    print('--> %r' % (out,))
+    out=s.ExecOut('ls /dontexist')
+    print('--> %r' % (out,))
+    out=s.ExecOut('sleep 1')
+    print('--> %r' % (out,))
+    out=s.ExecOut('false')
+    print('--> %r' % (out,))
+    s.Exec('ls /tmp')
+    s.Close()