--- /dev/null
+.. change::
+ :tags: change, platform
+ :tickets: 5094
+
+ Removed all dialect code related to support for Jython and zxJDBC. Jython
+ has not been supported by SQLAlchemy for many years and it is not expected
+ that the current zxJDBC code is at all functional; for the moment it just
+ takes up space and adds confusion by showing up in documentation. At the
+ moment, it appears that Jython has achieved Python 2.7 support in its
+ releases but not Python 3. If Jython were to be supported again, the form
+ it should take is against the Python 3 version of Jython, and the various
+ zxJDBC stubs for various backends should be implemented as a third party
+ dialect.
+
.. automodule:: sqlalchemy.dialects.mysql.pyodbc
-zxjdbc
-------
-
-.. automodule:: sqlalchemy.dialects.mysql.zxjdbc
.. automodule:: sqlalchemy.dialects.oracle.cx_oracle
-zxjdbc
-------
-
-.. automodule:: sqlalchemy.dialects.oracle.zxjdbc
.. automodule:: sqlalchemy.dialects.postgresql.pygresql
-zxjdbc
-------
-
-.. automodule:: sqlalchemy.dialects.postgresql.zxjdbc
-
.. versionchanged:: 1.3
Within the Python 3 series, 3.4 is now the minimum Python 3 version supported.
-Platforms that don't currently have support include Jython and IronPython.
-Jython has been supported in the past and may be supported in future
-releases as well, depending on the state of Jython itself.
-
Supported Installation Methods
-------------------------------
+++ /dev/null
-# connectors/zxJDBC.py
-# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-import sys
-
-from . import Connector
-
-
-class ZxJDBCConnector(Connector):
- driver = "zxjdbc"
-
- supports_sane_rowcount = False
- supports_sane_multi_rowcount = False
-
- supports_unicode_binds = True
- supports_unicode_statements = sys.version > "2.5.0+"
- description_encoding = None
- default_paramstyle = "qmark"
-
- jdbc_db_name = None
- jdbc_driver_name = None
-
- @classmethod
- def dbapi(cls):
- from com.ziclix.python.sql import zxJDBC
-
- return zxJDBC
-
- def _driver_kwargs(self):
- """Return kw arg dict to be sent to connect()."""
- return {}
-
- def _create_jdbc_url(self, url):
- """Create a JDBC url from a :class:`~sqlalchemy.engine.url.URL`"""
- return "jdbc:%s://%s%s/%s" % (
- self.jdbc_db_name,
- url.host,
- url.port is not None and ":%s" % url.port or "",
- url.database,
- )
-
- def create_connect_args(self, url):
- opts = self._driver_kwargs()
- opts.update(url.query)
- return [
- [
- self._create_jdbc_url(url),
- url.username,
- url.password,
- self.jdbc_driver_name,
- ],
- opts,
- ]
-
- def is_disconnect(self, e, connection, cursor):
- if not isinstance(e, self.dbapi.ProgrammingError):
- return False
- e = str(e)
- return "connection is closed" in e or "cursor is closed" in e
-
- def _get_server_version_info(self, connection):
- # use connection.connection.dbversion, and parse appropriately
- # to get a tuple
- raise NotImplementedError()
from . import mxodbc # noqa
from . import pymssql # noqa
from . import pyodbc # noqa
-from . import zxjdbc # noqa
from .base import BIGINT
from .base import BINARY
from .base import BIT
+++ /dev/null
-# mssql/zxjdbc.py
-# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: mssql+zxjdbc
- :name: zxJDBC for Jython
- :dbapi: zxjdbc
- :connectstring: mssql+zxjdbc://user:pass@host:port/dbname[?key=value&key=value...]
- :driverurl: http://jtds.sourceforge.net/
-
- .. note:: Jython is not supported by current versions of SQLAlchemy. The
- zxjdbc dialect should be considered as experimental.
-
-""" # noqa
-from .base import MSDialect
-from .base import MSExecutionContext
-from ... import engine
-from ...connectors.zxJDBC import ZxJDBCConnector
-
-
-class MSExecutionContext_zxjdbc(MSExecutionContext):
-
- _embedded_scope_identity = False
-
- def pre_exec(self):
- super(MSExecutionContext_zxjdbc, self).pre_exec()
- # scope_identity after the fact returns null in jTDS so we must
- # embed it
- if self._select_lastrowid and self.dialect.use_scope_identity:
- self._embedded_scope_identity = True
- self.statement += "; SELECT scope_identity()"
-
- def post_exec(self):
- if self._embedded_scope_identity:
- while True:
- try:
- row = self.cursor.fetchall()[0]
- break
- except self.dialect.dbapi.Error:
- self.cursor.nextset()
- self._lastrowid = int(row[0])
-
- if (
- self.isinsert or self.isupdate or self.isdelete
- ) and self.compiled.returning:
- self._result_proxy = engine.FullyBufferedResultProxy(self)
-
- if self._enable_identity_insert:
- table = self.dialect.identifier_preparer.format_table(
- self.compiled.statement.table
- )
- self.cursor.execute("SET IDENTITY_INSERT %s OFF" % table)
-
-
-class MSDialect_zxjdbc(ZxJDBCConnector, MSDialect):
- jdbc_db_name = "jtds:sqlserver"
- jdbc_driver_name = "net.sourceforge.jtds.jdbc.Driver"
-
- execution_ctx_cls = MSExecutionContext_zxjdbc
-
- def _get_server_version_info(self, connection):
- return tuple(
- int(x) for x in connection.connection.dbversion.split(".")
- )
-
-
-dialect = MSDialect_zxjdbc
from . import oursql # noqa
from . import pymysql # noqa
from . import pyodbc # noqa
-from . import zxjdbc # noqa
from .base import BIGINT
from .base import BINARY
from .base import BIT
+++ /dev/null
-# mysql/zxjdbc.py
-# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-r"""
-
-.. dialect:: mysql+zxjdbc
- :name: zxjdbc for Jython
- :dbapi: zxjdbc
- :connectstring: mysql+zxjdbc://<user>:<password>@<hostname>[:<port>]/<database>
- :driverurl: http://dev.mysql.com/downloads/connector/j/
-
- .. note:: Jython is not supported by current versions of SQLAlchemy. The
- zxjdbc dialect should be considered as experimental.
-
-Character Sets
---------------
-
-SQLAlchemy zxjdbc dialects pass unicode straight through to the
-zxjdbc/JDBC layer. To allow multiple character sets to be sent from the
-MySQL Connector/J JDBC driver, by default SQLAlchemy sets its
-``characterEncoding`` connection property to ``UTF-8``. It may be
-overridden via a ``create_engine`` URL parameter.
-
-""" # noqa
-import re
-
-from .base import BIT
-from .base import MySQLDialect
-from .base import MySQLExecutionContext
-from ... import types as sqltypes
-from ... import util
-from ...connectors.zxJDBC import ZxJDBCConnector
-
-
-class _ZxJDBCBit(BIT):
- def result_processor(self, dialect, coltype):
- """Converts boolean or byte arrays from MySQL Connector/J to longs."""
-
- def process(value):
- if value is None:
- return value
- if isinstance(value, bool):
- return int(value)
- v = 0
- for i in value:
- v = v << 8 | (i & 0xFF)
- value = v
- return value
-
- return process
-
-
-class MySQLExecutionContext_zxjdbc(MySQLExecutionContext):
- def get_lastrowid(self):
- cursor = self.create_cursor()
- cursor.execute("SELECT LAST_INSERT_ID()")
- lastrowid = cursor.fetchone()[0]
- cursor.close()
- return lastrowid
-
-
-class MySQLDialect_zxjdbc(ZxJDBCConnector, MySQLDialect):
- jdbc_db_name = "mysql"
- jdbc_driver_name = "com.mysql.jdbc.Driver"
-
- execution_ctx_cls = MySQLExecutionContext_zxjdbc
-
- colspecs = util.update_copy(
- MySQLDialect.colspecs, {sqltypes.Time: sqltypes.Time, BIT: _ZxJDBCBit}
- )
-
- def _detect_charset(self, connection):
- """Sniff out the character set in use for connection results."""
- # Prefer 'character_set_results' for the current connection over the
- # value in the driver. SET NAMES or individual variable SETs will
- # change the charset without updating the driver's view of the world.
- #
- # If it's decided that issuing that sort of SQL leaves you SOL, then
- # this can prefer the driver value.
- rs = connection.execute("SHOW VARIABLES LIKE 'character_set%%'")
- opts = {row[0]: row[1] for row in self._compat_fetchall(rs)}
- for key in ("character_set_connection", "character_set"):
- if opts.get(key, None):
- return opts[key]
-
- util.warn(
- "Could not detect the connection character set. "
- "Assuming latin1."
- )
- return "latin1"
-
- def _driver_kwargs(self):
- """return kw arg dict to be sent to connect()."""
- return dict(characterEncoding="UTF-8", yearIsDateType="false")
-
- def _extract_error_code(self, exception):
- # e.g.: DBAPIError: (Error) Table 'test.u2' doesn't exist
- # [SQLCode: 1146], [SQLState: 42S02] 'DESCRIBE `u2`' ()
- m = re.compile(r"\[SQLCode\: (\d+)\]").search(str(exception.args))
- c = m.group(1)
- if c:
- return int(c)
-
- def _get_server_version_info(self, connection):
- dbapi_con = connection.connection
- version = []
- r = re.compile(r"[.\-]")
- for n in r.split(dbapi_con.dbversion):
- try:
- version.append(int(n))
- except ValueError:
- version.append(n)
- return tuple(version)
-
-
-dialect = MySQLDialect_zxjdbc
from . import base # noqa
from . import cx_oracle # noqa
-from . import zxjdbc # noqa
from .base import BFILE
from .base import BINARY_DOUBLE
from .base import BINARY_FLOAT
Note that only DAY TO SECOND intervals are currently supported.
This is due to a lack of support for YEAR TO MONTH intervals
- within available DBAPIs (cx_oracle and zxjdbc).
+ within available DBAPIs.
:param day_precision: the day precision value. this is the number of
digits to store for the day field. Defaults to "2"
+++ /dev/null
-# oracle/zxjdbc.py
-# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: oracle+zxjdbc
- :name: zxJDBC for Jython
- :dbapi: zxjdbc
- :connectstring: oracle+zxjdbc://user:pass@host/dbname
- :driverurl: http://www.oracle.com/technetwork/database/features/jdbc/index-091264.html
-
- .. note:: Jython is not supported by current versions of SQLAlchemy. The
- zxjdbc dialect should be considered as experimental.
-
-""" # noqa
-import collections
-import decimal
-import re
-
-from .base import OracleCompiler
-from .base import OracleDialect
-from .base import OracleExecutionContext
-from ... import sql
-from ... import types as sqltypes
-from ... import util
-from ...connectors.zxJDBC import ZxJDBCConnector
-from ...engine import result as _result
-from ...sql import expression
-
-
-SQLException = zxJDBC = None
-
-
-class _ZxJDBCDate(sqltypes.Date):
- def result_processor(self, dialect, coltype):
- def process(value):
- if value is None:
- return None
- else:
- return value.date()
-
- return process
-
-
-class _ZxJDBCNumeric(sqltypes.Numeric):
- def result_processor(self, dialect, coltype):
- # XXX: does the dialect return Decimal or not???
- # if it does (in all cases), we could use a None processor as well as
- # the to_float generic processor
- if self.asdecimal:
-
- def process(value):
- if isinstance(value, decimal.Decimal):
- return value
- else:
- return decimal.Decimal(str(value))
-
- else:
-
- def process(value):
- if isinstance(value, decimal.Decimal):
- return float(value)
- else:
- return value
-
- return process
-
-
-class OracleCompiler_zxjdbc(OracleCompiler):
- def returning_clause(self, stmt, returning_cols):
- self.returning_cols = list(
- expression._select_iterables(returning_cols)
- )
-
- # within_columns_clause=False so that labels (foo AS bar) don't render
- columns = [
- self.process(c, within_columns_clause=False)
- for c in self.returning_cols
- ]
-
- if not hasattr(self, "returning_parameters"):
- self.returning_parameters = []
-
- binds = []
- for i, col in enumerate(self.returning_cols):
- dbtype = col.type.dialect_impl(self.dialect).get_dbapi_type(
- self.dialect.dbapi
- )
- self.returning_parameters.append((i + 1, dbtype))
-
- bindparam = sql.bindparam(
- "ret_%d" % i, value=ReturningParam(dbtype)
- )
- self.binds[bindparam.key] = bindparam
- binds.append(
- self.bindparam_string(self._truncate_bindparam(bindparam))
- )
-
- return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
-
-
-class OracleExecutionContext_zxjdbc(OracleExecutionContext):
- def pre_exec(self):
- if hasattr(self.compiled, "returning_parameters"):
- # prepare a zxJDBC statement so we can grab its underlying
- # OraclePreparedStatement's getReturnResultSet later
- self.statement = self.cursor.prepare(self.statement)
-
- def get_result_proxy(self):
- if hasattr(self.compiled, "returning_parameters"):
- rrs = None
- try:
- try:
- rrs = self.statement.__statement__.getReturnResultSet()
- next(rrs)
- except SQLException as sqle:
- msg = "%s [SQLCode: %d]" % (
- sqle.getMessage(),
- sqle.getErrorCode(),
- )
- if sqle.getSQLState() is not None:
- msg += " [SQLState: %s]" % sqle.getSQLState()
- raise zxJDBC.Error(msg)
- else:
- row = tuple(
- self.cursor.datahandler.getPyObject(rrs, index, dbtype)
- for index, dbtype in self.compiled.returning_parameters
- )
- return ReturningResultProxy(self, row)
- finally:
- if rrs is not None:
- try:
- rrs.close()
- except SQLException:
- pass
- self.statement.close()
-
- return _result.ResultProxy(self)
-
- def create_cursor(self):
- cursor = self._dbapi_connection.cursor()
- cursor.datahandler = self.dialect.DataHandler(cursor.datahandler)
- return cursor
-
-
-class ReturningResultProxy(_result.FullyBufferedResultProxy):
-
- """ResultProxy backed by the RETURNING ResultSet results."""
-
- def __init__(self, context, returning_row):
- self._returning_row = returning_row
- super(ReturningResultProxy, self).__init__(context)
-
- def _cursor_description(self):
- ret = []
- for c in self.context.compiled.returning_cols:
- if hasattr(c, "name"):
- ret.append((c.name, c.type))
- else:
- ret.append((c.anon_label, c.type))
- return ret
-
- def _buffer_rows(self):
- return collections.deque([self._returning_row])
-
-
-class ReturningParam(object):
-
- """A bindparam value representing a RETURNING parameter.
-
- Specially handled by OracleReturningDataHandler.
- """
-
- def __init__(self, type_):
- self.type = type_
-
- def __eq__(self, other):
- if isinstance(other, ReturningParam):
- return self.type == other.type
- return NotImplemented
-
- def __ne__(self, other):
- if isinstance(other, ReturningParam):
- return self.type != other.type
- return NotImplemented
-
- def __repr__(self):
- kls = self.__class__
- return "<%s.%s object at 0x%x type=%s>" % (
- kls.__module__,
- kls.__name__,
- id(self),
- self.type,
- )
-
-
-class OracleDialect_zxjdbc(ZxJDBCConnector, OracleDialect):
- jdbc_db_name = "oracle"
- jdbc_driver_name = "oracle.jdbc.OracleDriver"
-
- statement_compiler = OracleCompiler_zxjdbc
- execution_ctx_cls = OracleExecutionContext_zxjdbc
-
- colspecs = util.update_copy(
- OracleDialect.colspecs,
- {sqltypes.Date: _ZxJDBCDate, sqltypes.Numeric: _ZxJDBCNumeric},
- )
-
- def __init__(self, *args, **kwargs):
- super(OracleDialect_zxjdbc, self).__init__(*args, **kwargs)
- global SQLException, zxJDBC
- from java.sql import SQLException
- from com.ziclix.python.sql import zxJDBC
- from com.ziclix.python.sql.handler import OracleDataHandler
-
- class OracleReturningDataHandler(OracleDataHandler):
- """zxJDBC DataHandler that specially handles ReturningParam."""
-
- def setJDBCObject(self, statement, index, object_, dbtype=None):
- if type(object_) is ReturningParam:
- statement.registerReturnParameter(index, object_.type)
- elif dbtype is None:
- OracleDataHandler.setJDBCObject(
- self, statement, index, object_
- )
- else:
- OracleDataHandler.setJDBCObject(
- self, statement, index, object_, dbtype
- )
-
- self.DataHandler = OracleReturningDataHandler
-
- def initialize(self, connection):
- super(OracleDialect_zxjdbc, self).initialize(connection)
- self.implicit_returning = connection.connection.driverversion >= "10.2"
-
- def _create_jdbc_url(self, url):
- return "jdbc:oracle:thin:@%s:%s:%s" % (
- url.host,
- url.port or 1521,
- url.database,
- )
-
- def _get_server_version_info(self, connection):
- version = re.search(
- r"Release ([\d\.]+)", connection.connection.dbversion
- ).group(1)
- return tuple(int(x) for x in version.split("."))
-
-
-dialect = OracleDialect_zxjdbc
from . import psycopg2cffi # noqa
from . import pygresql # noqa
from . import pypostgresql # noqa
-from . import zxjdbc # noqa
from .array import All
from .array import Any
from .array import ARRAY
"""PostgreSQL INTERVAL type.
- The INTERVAL type may not be supported on all DBAPIs.
- It is known to work on psycopg2 and not pg8000 or zxjdbc.
-
"""
__visit_name__ = "INTERVAL"
+++ /dev/null
-# postgresql/zxjdbc.py
-# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: postgresql+zxjdbc
- :name: zxJDBC for Jython
- :dbapi: zxjdbc
- :connectstring: postgresql+zxjdbc://scott:tiger@localhost/db
- :driverurl: http://jdbc.postgresql.org/
-
-
-"""
-from .base import PGDialect
-from .base import PGExecutionContext
-from ...connectors.zxJDBC import ZxJDBCConnector
-
-
-class PGExecutionContext_zxjdbc(PGExecutionContext):
- def create_cursor(self):
- cursor = self._dbapi_connection.cursor()
- cursor.datahandler = self.dialect.DataHandler(cursor.datahandler)
- return cursor
-
-
-class PGDialect_zxjdbc(ZxJDBCConnector, PGDialect):
- jdbc_db_name = "postgresql"
- jdbc_driver_name = "org.postgresql.Driver"
-
- execution_ctx_cls = PGExecutionContext_zxjdbc
-
- supports_native_decimal = True
-
- def __init__(self, *args, **kwargs):
- super(PGDialect_zxjdbc, self).__init__(*args, **kwargs)
- from com.ziclix.python.sql.handler import PostgresqlDataHandler
-
- self.DataHandler = PostgresqlDataHandler
-
- def _get_server_version_info(self, connection):
- parts = connection.connection.dbversion.split(".")
- return tuple(int(x) for x in parts)
-
-
-dialect = PGDialect_zxjdbc
return results == {True}
def _check_unicode_description(self, connection):
- # all DBAPIs on Py2K return cursor.description as encoded,
- # until pypy2.1beta2 with sqlite, so let's just check it -
- # it's likely others will start doing this too in Py2k.
+ # all DBAPIs on Py2K return cursor.description as encoded
if util.py2k and not self.supports_unicode_statements:
cast_to = util.binary_type
"""
with mapperlib._CONFIGURE_MUTEX:
while _mapper_registry:
- try:
- # can't even reliably call list(weakdict) in jython
- mapper, b = _mapper_registry.popitem()
- mapper.dispose()
- except KeyError:
- pass
+ mapper, b = _mapper_registry.popitem()
+ mapper.dispose()
joinedload = strategy_options.joinedload._unbound_fn
from ..util import pickle
-if util.jython:
- import array
-
-
class _LookupExpressionAdapter(object):
"""Mixin expression adaptations based on lookup tables.
if util.py2k:
def result_processor(self, dialect, coltype):
- if util.jython:
-
- def process(value):
- if value is not None:
- if isinstance(value, array.array):
- return value.tostring()
- return str(value)
- else:
- return None
-
- else:
- process = processors.to_str
- return process
+ return processors.to_str
else:
# this can cause a deadlock with pg8000 - pg8000 acquires
# prepared statement lock inside of rollback() - if async gc
# is collecting in finalize_fairy, deadlock.
- # not sure if this should be if pypy/jython only.
+ # not sure if this should be for non-cpython only.
# note that firebird/fdb definitely needs this though
for conn, rec in list(self.conns):
if rec.connection is None:
import collections
import contextlib
import os
+import platform
import pstats
import sys
from . import config
from .util import gc_collect
-from ..util import jython
-from ..util import pypy
+from ..util import cpython
from ..util import win32
# keep it at 2.7, 3.1, 3.2, etc. for now.
py_version = ".".join([str(v) for v in sys.version_info[0:2]])
- platform_tokens = [py_version]
+ if not cpython:
+ platform_tokens = [platform.python_implementation(), py_version]
+ else:
+ platform_tokens = [py_version]
platform_tokens.append(dbapi_key)
- if jython:
- platform_tokens.append("jython")
- if pypy:
- platform_tokens.append("pypy")
+
if win32:
platform_tokens.append("win")
platform_tokens.append(
from sqlalchemy.util import pickle
return exclusions.only_if(
- lambda: not util.pypy
+ lambda: util.cpython
and pickle.__name__ == "cPickle"
or sys.version_info >= (3, 2),
"Needs cPickle+cPython or newer Python 3 pickle",
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])
- # failed on pypy1.9 but seems to be OK on pypy 2.1
- # @exclusions.fails_if(lambda: util.pypy,
- # "lastrowid not maintained after "
- # "connection close")
@requirements.dbapi_lastrowid
def test_native_lastrowid_autoinc(self):
r = config.db.execute(
import gc
import random
import sys
-import time
import types
from . import mock
from ..util import decorator
from ..util import defaultdict
+from ..util import has_refcount_gc
from ..util import inspect_getfullargspec
-from ..util import jython
from ..util import py2k
-from ..util import pypy
-if jython:
+if not has_refcount_gc:
- def jython_gc_collect(*args):
- """aggressive gc.collect for tests."""
- gc.collect()
- time.sleep(0.1)
- gc.collect()
- gc.collect()
- return 0
-
- # "lazy" gc, for VM's that don't GC on refcount == 0
- gc_collect = lazy_gc = jython_gc_collect
-elif pypy:
-
- def pypy_gc_collect(*args):
+ def non_refcount_gc_collect(*args):
gc.collect()
gc.collect()
- gc_collect = lazy_gc = pypy_gc_collect
+ gc_collect = lazy_gc = non_refcount_gc_collect
else:
# assume CPython - straight gc.collect, lazy_gc() is a pass
gc_collect = gc.collect
from .compat import cpython # noqa
from .compat import decode_backslashreplace # noqa
from .compat import dottedgetter # noqa
+from .compat import has_refcount_gc # noqa
from .compat import inspect_getfullargspec # noqa
from .compat import int_types # noqa
from .compat import iterbytes # noqa
from .compat import itertools_filter # noqa
from .compat import itertools_filterfalse # noqa
-from .compat import jython # noqa
from .compat import namedtuple # noqa
from .compat import nested # noqa
from .compat import next # noqa
from .compat import py33 # noqa
from .compat import py36 # noqa
from .compat import py3k # noqa
-from .compat import pypy # noqa
from .compat import quote_plus # noqa
from .compat import raise_from_cause # noqa
from .compat import reduce # noqa
import contextlib
import inspect
import operator
+import platform
import sys
py3k = sys.version_info >= (3, 0)
py2k = sys.version_info < (3, 0)
py265 = sys.version_info >= (2, 6, 5)
-jython = sys.platform.startswith("java")
-pypy = hasattr(sys, "pypy_version_info")
+
+
+cpython = platform.python_implementation() == "CPython"
win32 = sys.platform.startswith("win")
-cpython = not pypy and not jython # TODO: something better for this ?
+
+has_refcount_gc = bool(cpython)
contextmanager = contextlib.contextmanager
dottedgetter = operator.attrgetter
def teardown(self):
# the tests leave some fake connections
# around which don't necessarily
- # get gc'ed as quickly as we'd like,
- # on backends like pypy, python3.2
+ # get gc'ed as quickly as we'd like all the time,
+ # particularly for non-refcount gc
pool_module._refs.clear()
def setup(self):
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
-from sqlalchemy.testing import fails_if
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
),
)
- @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw")
+ @testing.requires.cpython
def test_callable_argspec_py_builtin(self):
import datetime
assert_raises(TypeError, get_callable_argspec, datetime.datetime.now)
- @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw")
+ @testing.requires.cpython
def test_callable_argspec_obj_init(self):
assert_raises(TypeError, get_callable_argspec, object)
compat.FullArgSpec(["x", "y"], None, None, None, [], None, {}),
)
- @fails_if(lambda: util.pypy, "pypy returns plain *arg, **kw")
+ @testing.requires.cpython
def test_callable_argspec_partial(self):
from functools import partial
grouped=False,
)
- @testing.fails_if(
- lambda: util.pypy,
- "pypy doesn't report Obj.__init__ as object.__init__",
- )
+ @testing.requires.cpython
def test_init_grouped(self):
object_spec = {
"args": "(self)",
self._test_init(None, object_spec, wrapper_spec, custom_spec)
self._test_init(True, object_spec, wrapper_spec, custom_spec)
- @testing.fails_if(
- lambda: util.pypy,
- "pypy doesn't report Obj.__init__ as object.__init__",
- )
+ @testing.requires.cpython
def test_init_bare(self):
object_spec = {
"args": "self",
)
eq_(conn.execute(s3).fetchall(), [(5, rowid)])
- @testing.fails_on(
- "+zxjdbc", "Not yet known how to pass values of the " "INTERVAL type"
- )
@testing.provide_metadata
def test_interval(self):
metadata = self.metadata
@testing.requires.psycopg2_compatibility
def test_psycopg2_non_standard_err(self):
- # under pypy the name here is psycopg2cffi
+ # note that psycopg2 is sometimes called psycopg2cffi
+ # depending on platform
psycopg2 = testing.db.dialect.dbapi
TransactionRollbackError = __import__(
"%s.extensions" % psycopg2.__name__
"commit prepared 'gilberte'",
)
- @testing.fails_on(
- "+zxjdbc",
- "Can't infer the SQL type to use for an instance "
- "of org.python.core.PyObjectDerived.",
- )
def test_extract(self):
fivedaysago = testing.db.scalar(
select([func.now()])
finally:
testing.db.execute("drop table speedy_users")
- @testing.fails_on("+zxjdbc", "psycopg2/pg8000 specific assertion")
@testing.requires.psycopg2_or_pg8000_compatibility
def test_numeric_raise(self):
stmt = text("select cast('hi' as char) as hi").columns(hi=Numeric)
metadata.drop_all()
@testing.fails_on("postgresql+pg8000", "uses positional")
- @testing.fails_on("postgresql+zxjdbc", "uses qmark")
def test_expression_pyformat(self):
self.assert_compile(
matchtable.c.title.match("somstr"),
@testing.fails_on("postgresql+psycopg2", "uses pyformat")
@testing.fails_on("postgresql+pypostgresql", "uses pyformat")
@testing.fails_on("postgresql+pygresql", "uses pyformat")
- @testing.fails_on("postgresql+zxjdbc", "uses qmark")
@testing.fails_on("postgresql+psycopg2cffi", "uses pyformat")
def test_expression_positional(self):
self.assert_compile(
{"data": 9},
)
- @testing.fails_on(
- "postgresql+zxjdbc",
- "XXX: postgresql+zxjdbc currently returns a Decimal result for Float",
- )
def test_float_coercion(self):
data_table = self.tables.data_table
).scalar()
eq_(round_decimal(ret, 9), result)
- @testing.fails_on(
- "postgresql+zxjdbc", "zxjdbc has no support for PG arrays"
- )
@testing.provide_metadata
def test_arrays_pg(self):
metadata = self.metadata
row = t1.select().execute().first()
eq_(row, ([5], [5], [6], [decimal.Decimal("6.4")]))
- @testing.fails_on(
- "postgresql+zxjdbc", "zxjdbc has no support for PG arrays"
- )
@testing.provide_metadata
def test_arrays_base(self):
metadata = self.metadata
__only_on__ = "postgresql > 8.3"
- @testing.fails_on(
- "postgresql+zxjdbc",
- 'zxjdbc fails on ENUM: column "XXX" is of type '
- "XXX but expression is of type character varying",
- )
@testing.provide_metadata
def test_create_table(self):
metadata = self.metadata
exc.CompileError, etype.compile, dialect=postgresql.dialect()
)
- @testing.fails_on(
- "postgresql+zxjdbc",
- 'zxjdbc fails on ENUM: column "XXX" is of type '
- "XXX but expression is of type character varying",
- )
@testing.provide_metadata
def test_unicode_labels(self):
metadata = self.metadata
def teardown_class(cls):
metadata.drop_all()
- @testing.fails_on(
- "postgresql+zxjdbc",
- "XXX: postgresql+zxjdbc doesn't give a tzinfo back",
- )
def test_with_timezone(self):
# get a date with a tzinfo
__only_on__ = "postgresql"
__backend__ = True
- __unsupported_on__ = "postgresql+pg8000", "postgresql+zxjdbc"
+ __unsupported_on__ = ("postgresql+pg8000",)
ARRAY = postgresql.ARRAY
eq_(result, "%")
@testing.fails_on_everything_except(
- "firebird", "sqlite", "+pyodbc", "+mxodbc", "+zxjdbc", "mysql+oursql"
+ "firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql"
)
def test_raw_qmark(self):
def go(conn):
"mysql+mysqlconnector",
"postgresql",
)
- @testing.fails_on("postgresql+zxjdbc", "sprintf not supported")
def test_raw_sprintf(self):
def go(conn):
conn.execute(
connection.close()
@testing.requires.savepoints
- @testing.crashes(
- "oracle+zxjdbc",
- "Errors out and causes subsequent tests to " "deadlock",
- )
def test_nested_subtransaction_commit(self):
connection = testing.db.connect()
transaction = connection.begin()
)
# This proves SA can handle a class with non-string dict keys
- if not util.pypy and not util.jython:
+ if util.cpython:
locals()[42] = 99 # Don't remove this line!
def __init__(self, **kwargs):
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect
-from sqlalchemy.util.compat import pypy
from . import _fixtures
from .inheritance import _poly_fixtures
from .test_options import PathTest as OptionsPathTest
def test_prune_events(self):
self._test_prune(self._event_fixture)
- @testing.fails_if(lambda: pypy, "pypy has a real GC")
- @testing.fails_on("+zxjdbc", "http://www.sqlalchemy.org/trac/ticket/1473")
+ @testing.requires.cpython
def _test_prune(self, fixture):
s, prune = fixture()
"pg8000 parses the SQL itself before passing on "
"to PG, doesn't parse this",
)
- @testing.fails_on(
- "postgresql+zxjdbc",
- "zxjdbc parses the SQL itself before passing on "
- "to PG, doesn't parse this",
- )
@testing.fails_on("firebird", "unknown")
def test_values_with_boolean_selects(self):
"""Tests a values clause that works with select boolean
assert instrumentation.manager_of_class(A) is None
assert not hasattr(A, "x")
- # I prefer 'is' here but on pypy
- # it seems only == works
assert A.__init__ == object.__init__
def test_compileonattr_rel_backref_a(self):
"""target database must *not* support ON UPDATE..CASCADE behavior in
foreign keys."""
- return fails_on_everything_except(
- "sqlite", "oracle", "+zxjdbc"
- ) + skip_if("mssql")
+ return fails_on_everything_except("sqlite", "oracle") + skip_if(
+ "mssql"
+ )
@property
def recursive_fk_cascade(self):
no_support(
"sybase", "two-phase xact not supported by drivers/SQLA"
),
- no_support(
- "postgresql+zxjdbc",
- "FIXME: JDBC driver confuses the transaction state, "
- "may need separate XA implementation",
- ),
no_support(
"mysql",
"recent MySQL communiity editions have too many issues "
[
lambda config: against(config, "postgresql")
and not against(config, "+pg8000")
- and not against(config, "+zxjdbc")
]
)
"""target dialect supports representation of Python
datetime.datetime() with microsecond objects."""
- return skip_if(
- ["mssql", "mysql", "firebird", "+zxjdbc", "oracle", "sybase"]
- )
+ return skip_if(["mssql", "mysql", "firebird", "oracle", "sybase"])
@property
def timestamp_microseconds(self):
"""target dialect supports representation of Python
datetime.time() with microsecond objects."""
- return skip_if(
- ["mssql", "mysql", "firebird", "+zxjdbc", "oracle", "sybase"]
- )
+ return skip_if(["mssql", "mysql", "firebird", "oracle", "sybase"])
@property
def precision_numerics_general(self):
@testing.fails_on(
"firebird", "fb/kintersbasdb can't handle the bind params"
)
- @testing.fails_on("oracle+zxjdbc", "JDBC driver bug")
def test_anon_expressions(self):
result = (
table.insert()