From 9dd15963d36edd1989e108a817acb8ee5cfd2401 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 26 Jun 2006 19:51:01 +0000 Subject: [PATCH] migrated Queue.Queue to its own module here, to assure RLock compatibility --- CHANGES | 7 +- lib/sqlalchemy/pool.py | 15 +--- lib/sqlalchemy/queue.py | 182 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 13 deletions(-) create mode 100644 lib/sqlalchemy/queue.py diff --git a/CHANGES b/CHANGES index a20782551f..3a6ee1abad 100644 --- a/CHANGES +++ b/CHANGES @@ -20,7 +20,12 @@ properly saving - fixed bug when specifying explicit module to mysql dialect - when QueuePool times out it raises a TimeoutError instead of erroneously making another connection -- attempting to fix a rare hang that can occur with Queue.Queue +- Queue.Queue usage in pool has been replaced with a locally +modified version (works in py2.3/2.4!) that uses a threading.RLock +for a mutex. this is to fix a reported case where a ConnectionFairy's +__del__() method got called within the Queue's get() method, which +then returns its connection to the Queue via the the put() method, +causing a reentrant hang unless threading.RLock is used. 0.2.3 - overhaul to mapper compilation to be deferred. this allows mappers diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 420e412471..0a5b5ca2a7 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -10,15 +10,14 @@ on a thread local basis. Also provides a DBAPI2 transparency layer so that pool be managed automatically, based on module type and connect arguments, simply by calling regular DBAPI connect() methods.""" -import Queue, weakref, string, cPickle -import util, exceptions +import weakref, string, cPickle +from sqlalchemy import util, exceptions +import sqlalchemy.queue as Queue try: import thread - import threading except: import dummythread as thread - import dummythreading as threading proxies = {} @@ -210,14 +209,6 @@ class QueuePool(Pool): self._creator = creator self._pool = Queue.Queue(pool_size) - # modify the pool's mutex to be an RLock. this is because a rare condition can - # occur where a ConnectionFairy's __del__ method gets called within the get() method - # of the Queue (and then tries to do a put() within the get()), causing a re-entrant hang. - # the RLock allows the mutex to be reentrant within that case. - self._pool.mutex = threading.RLock() - self._pool.not_empty = threading.Condition(self._pool.mutex) - self._pool.not_full = threading.Condition(self._pool.mutex) - self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout diff --git a/lib/sqlalchemy/queue.py b/lib/sqlalchemy/queue.py new file mode 100644 index 0000000000..49bb4badf3 --- /dev/null +++ b/lib/sqlalchemy/queue.py @@ -0,0 +1,182 @@ +"""an adaptation of Py2.3/2.4's Queue module which supports reentrant behavior, +using RLock instead of Lock for its mutex object. +this is to support the connection pool's usage of __del__ to return connections +to the underlying Queue, which can apparently in extremely rare cases be invoked +within the get() method of the Queue itself, producing a put() inside the get() +and therefore a reentrant condition.""" + +from time import time as _time + +try: + # py2.4 deque class + from collections import deque +except: + # roll our own... + class deque(list): + def popleft(self): + return self.pop(0) + +__all__ = ['Empty', 'Full', 'Queue'] + +class Empty(Exception): + "Exception raised by Queue.get(block=0)/get_nowait()." + pass + +class Full(Exception): + "Exception raised by Queue.put(block=0)/put_nowait()." + pass + +class Queue: + def __init__(self, maxsize=0): + """Initialize a queue object with a given maximum size. + + If maxsize is <= 0, the queue size is infinite. + """ + try: + import threading + except ImportError: + import dummy_threading as threading + self._init(maxsize) + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the two conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.RLock() + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) + + def qsize(self): + """Return the approximate size of the queue (not reliable!).""" + self.mutex.acquire() + n = self._qsize() + self.mutex.release() + return n + + def empty(self): + """Return True if the queue is empty, False otherwise (not reliable!).""" + self.mutex.acquire() + n = self._empty() + self.mutex.release() + return n + + def full(self): + """Return True if the queue is full, False otherwise (not reliable!).""" + self.mutex.acquire() + n = self._full() + self.mutex.release() + return n + + def put(self, item, block=True, timeout=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + """ + self.not_full.acquire() + try: + if not block: + if self._full(): + raise Full + elif timeout is None: + while self._full(): + self.not_full.wait() + else: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") + endtime = _time() + timeout + while self._full(): + remaining = endtime - _time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._put(item) + self.not_empty.notify() + finally: + self.not_full.release() + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + """ + return self.put(item, False) + + def get(self, block=True, timeout=None): + """Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + """ + self.not_empty.acquire() + try: + if not block: + if self._empty(): + raise Empty + elif timeout is None: + while self._empty(): + self.not_empty.wait() + else: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") + endtime = _time() + timeout + while self._empty(): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + finally: + self.not_empty.release() + + def get_nowait(self): + """Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + """ + return self.get(False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.maxsize = maxsize + self.queue = deque() + + def _qsize(self): + return len(self.queue) + + # Check whether the queue is empty + def _empty(self): + return not self.queue + + # Check whether the queue is full + def _full(self): + return self.maxsize > 0 and len(self.queue) == self.maxsize + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + return self.queue.popleft() -- 2.47.3