Fix the same issue reported in psycopg/psycopg2#829.
# Copyright (C) 2020 The Psycopg Team
+import os
import logging
from weakref import ref
from functools import partial
"pgconn_ptr",
"notice_handler",
"_notice_receiver",
+ "_procpid",
"__weakref__",
)
)
impl.PQsetNoticeReceiver(pgconn_ptr, self._notice_receiver, None)
+ self._procpid = os.getpid()
+
def __del__(self) -> None:
- self.finish()
+ # Close the connection only if it was created in this process,
+ # not if this object is being GC'd after fork.
+ if os.getpid() == self._procpid:
+ self.finish()
@classmethod
def connect(cls, conninfo: bytes) -> "PGconn":
+from posix.fcntl cimport pid_t
from psycopg3.pq cimport libpq as impl
ctypedef char *(*conn_bytes_f) (const impl.PGconn *)
cdef class PGconn:
cdef impl.PGconn* pgconn_ptr
cdef object __weakref__
+ cdef public object notice_handler
+ cdef pid_t _procpid
@staticmethod
cdef PGconn _from_ptr(impl.PGconn *ptr)
- cdef public object notice_handler
-
cdef int _ensure_pgconn(self) except 0
cdef char *_call_bytes(self, conn_bytes_f func) except NULL
cdef int _call_int(self, conn_int_f func) except -1
# Copyright (C) 2020 The Psycopg Team
+from posix.unistd cimport getpid
from cpython.mem cimport PyMem_Malloc, PyMem_Free
import logging
def __cinit__(self):
self.pgconn_ptr = NULL
+ self._procpid = getpid()
def __dealloc__(self):
- self.finish()
+ # Close the connection only if it was created in this process,
+ # not if this object is being GC'd after fork.
+ if self._procpid == getpid():
+ self.finish()
@classmethod
def connect(cls, conninfo: bytes) -> PGconn:
Tests dealing with concurrency issues.
"""
+import os
+import sys
import time
import queue
import pytest
+import shutil
+import tempfile
import threading
+import subprocess as sp
import psycopg3
stop = True
assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+def test_multiprocess_close(dsn):
+ # Check the problem reported in psycopg2#829
+ # Subprocess gcs the copy of the fd after fork so it closes connection.
+ module = f"""\
+import time
+import psycopg3
+
+def thread():
+ conn = psycopg3.connect({repr(dsn)})
+ curs = conn.cursor()
+ for i in range(10):
+ curs.execute("select 1")
+ time.sleep(0.1)
+
+def process():
+ time.sleep(0.2)
+"""
+
+ script = """\
+import time
+import threading
+import multiprocessing
+import mptest
+
+t = threading.Thread(target=mptest.thread, name='mythread')
+t.start()
+time.sleep(0.2)
+multiprocessing.Process(target=mptest.process, name='myprocess').start()
+t.join()
+"""
+
+ dir = tempfile.mkdtemp()
+ try:
+ with open(os.path.join(dir, "mptest.py"), "w") as f:
+ f.write(module)
+ env = dict(os.environ)
+ env["PYTHONPATH"] = dir + os.pathsep + env.get("PYTHONPATH", "")
+ out = sp.check_output(
+ [sys.executable, "-c", script], stderr=sp.STDOUT, env=env
+ ).decode("utf8", "replace")
+ assert out == "", out.strip().splitlines()[-1]
+ finally:
+ shutil.rmtree(dir, ignore_errors=True)