import io
import os
import pickle
+import struct
import subprocess
import sys
import unittest
return py_executable_map.get(py_version, None)
-@support.requires_resource('cpu')
+def read_exact(f, n):
+ buf = b''
+ while len(buf) < n:
+ chunk = f.read(n - len(buf))
+ if not chunk:
+ raise EOFError
+ buf += chunk
+ return buf
+
+
class AbstractCompatTests(pickletester.AbstractPickleTests):
py_version = None
+ worker = None
@classmethod
def setUpClass(cls):
if not have_python_version(cls.py_version):
py_version_str = ".".join(map(str, cls.py_version))
raise unittest.SkipTest(f'Python {py_version_str} not available')
+ cls.addClassCleanup(cls.finish_worker)
# Override the default pickle protocol to match what xpickle worker
# will be running.
highest_protocol = highest_proto_for_py_version(cls.py_version)
cls.enterClassContext(support.swap_attr(pickle, 'HIGHEST_PROTOCOL',
highest_protocol))
- @staticmethod
- def send_to_worker(python, data):
+ @classmethod
+ def start_worker(cls, python):
+ target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
+ worker = subprocess.Popen([*python, target],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ # For windows bpo-17023.
+ shell=is_windows)
+ cls.worker = worker
+ return worker
+
+ @classmethod
+ def finish_worker(cls):
+ worker = cls.worker
+ if worker is None:
+ return
+ cls.worker = None
+ worker.stdin.close()
+ worker.stdout.close()
+ worker.stderr.close()
+ worker.terminate()
+ worker.wait()
+
+ @classmethod
+ def send_to_worker(cls, python, data):
"""Bounce a pickled object through another version of Python.
This will send data to a child process where it will
be unpickled, then repickled and sent back to the parent process.
Returns:
The pickled data received from the child process.
"""
- target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py')
- worker = subprocess.Popen([*python, target],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # For windows bpo-17023.
- shell=is_windows)
- stdout, stderr = worker.communicate(data)
- if worker.returncode == 0:
- return stdout
- # if the worker fails, it will write the exception to stdout
+ worker = cls.worker
+ if worker is None:
+ worker = cls.start_worker(python)
+
try:
- exception = pickle.loads(stdout)
- except (pickle.UnpicklingError, EOFError):
+ worker.stdin.write(struct.pack('!i', len(data)) + data)
+ worker.stdin.flush()
+
+ size, = struct.unpack('!i', read_exact(worker.stdout, 4))
+ if size > 0:
+ return read_exact(worker.stdout, size)
+ # if the worker fails, it will write the exception to stdout
+ if size < 0:
+ stdout = read_exact(worker.stdout, -size)
+ try:
+ exception = pickle.loads(stdout)
+ except (pickle.UnpicklingError, EOFError):
+ pass
+ else:
+ if isinstance(exception, Exception):
+ # To allow for tests which test for errors.
+ raise exception
+ _, stderr = worker.communicate()
raise RuntimeError(stderr)
- else:
- if support.verbose > 1:
- print()
- print(f'{data = }')
- print(f'{stdout = }')
- print(f'{stderr = }')
- if isinstance(exception, Exception):
- # To allow for tests which test for errors.
- raise exception
- else:
- raise RuntimeError(stderr)
-
+ except:
+ cls.finish_worker()
+ raise
def dumps(self, arg, proto=0, **kwargs):
# Skip tests that require buffer_callback arguments since
self.skipTest('Test does not support "buffer_callback" argument.')
f = io.BytesIO()
p = self.pickler(f, proto, **kwargs)
- p.dump((proto, arg))
- f.seek(0)
- data = bytes(f.read())
+ p.dump(arg)
+ data = struct.pack('!i', proto) + f.getvalue()
python = py_executable_map[self.py_version]
return self.send_to_worker(python, data)
# pickles in a different Python version.
import os
import pickle
+import struct
import sys
sources = f.read()
exec(sources, vars(test_module))
+def read_exact(f, n):
+ buf = b''
+ while len(buf) < n:
+ chunk = f.read(n - len(buf))
+ if not chunk:
+ raise EOFError
+ buf += chunk
+ return buf
in_stream = getattr(sys.stdin, 'buffer', sys.stdin)
out_stream = getattr(sys.stdout, 'buffer', sys.stdout)
try:
- message = pickle.load(in_stream)
- protocol, obj = message
- pickle.dump(obj, out_stream, protocol)
-except Exception as e:
+ while True:
+ size, = struct.unpack('!i', read_exact(in_stream, 4))
+ if not size:
+ break
+ data = read_exact(in_stream, size)
+ protocol, = struct.unpack('!i', data[:4])
+ obj = pickle.loads(data[4:])
+ data = pickle.dumps(obj, protocol)
+ out_stream.write(struct.pack('!i', len(data)) + data)
+ out_stream.flush()
+except Exception as exc:
# dump the exception to stdout and write to stderr, then exit
- pickle.dump(e, out_stream)
- sys.stderr.write(repr(e))
+ try:
+ data = pickle.dumps(exc)
+ out_stream.write(struct.pack('!i', -len(data)) + data)
+ out_stream.flush()
+ except Exception:
+ out_stream.write(struct.pack('!i', 0))
+ out_stream.flush()
+ sys.stderr.write(repr(exc))
+ sys.stderr.flush()
sys.exit(1)