$ pytest --dbs
Available --db options (use --dburi to override)
default sqlite:///:memory:
- firebird firebird://sysdba:masterkey@localhost//Users/classic/foo.fdb
mariadb mariadb://scott:tiger@192.168.0.199:3307/test
mssql mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server
mssql_pymssql mssql+pymssql://scott:tiger@ms_2008
--- /dev/null
+.. change::
+ :tags: removed, engine
+ :tickets: 7258
+
+ Removed legacy and deprecated package ``sqlalchemy.databases``.
+ Please use ``sqlalchemy.dialects`` instead.
--- /dev/null
+.. change::
+ :tags: mssql, removed
+ :tickets: 7258
+
+ Removed support for the mxodbc driver due to lack of testing support. ODBC
+ users may use the pyodbc dialect which is fully supported.
+
+.. change::
+ :tags: mysql, removed
+ :tickets: 7258
+
+ Removed support for the OurSQL driver for MySQL and MariaDB, as this
+ driver does not seem to be maintained.
+
+.. change::
+ :tags: postgresql, removed
+ :tickets: 7258
+
+ Removed support for multiple deprecated drivers::
+
+ - pypostgresql for PostgreSQL. This is available as an
+ external driver at https://github.com/PyGreSQL
+ - pygresql for PostgreSQL.
+
+ Please switch to one of the supported drivers or to the external
+ version of the same driver.
--- /dev/null
+.. change::
+ :tags: firebird, removed
+ :tickets: 7258
+
+ Removed the "firebird" internal dialect that was deprecated in previous
+ SQLAlchemy versions. Third party dialect support is available.
+
+ .. seealso::
+
+ :ref:`external_toplevel`
--- /dev/null
+.. change::
+ :tags: sybase, removed
+ :tickets: 7258
+
+ Removed the "sybase" internal dialect that was deprecated in previous
+ SQLAlchemy versions. Third party dialect support is available.
+
+ .. seealso::
+
+ :ref:`external_toplevel`
\ No newline at end of file
"sqlite",
"mssql",
"oracle",
- "firebird",
]
# tags to sort on inside of sections
changelog_inner_tag_sort = [
+++ /dev/null
-.. _firebird_toplevel:
-
-Firebird
-========
-
-.. automodule:: sqlalchemy.dialects.firebird.base
-
-fdb
----
-
-.. automodule:: sqlalchemy.dialects.firebird.fdb
-
-kinterbasdb
------------
-
-.. automodule:: sqlalchemy.dialects.firebird.kinterbasdb
Pull requests with associated issues may be accepted to continue supporting
older versions, which are reviewed on a case-by-case basis.
-
-Deprecated, no longer supported dialects
-----------------------------------------
-
-The following dialects have implementations within SQLAlchemy, but they are not
-part of continuous integration testing nor are they actively developed.
-These dialects are deprecated and will be removed in future major releases.
-
-.. toctree::
- :maxdepth: 1
- :glob:
-
- firebird
- sybase
-
-Note that both of these dialects now have third-party implementations that
-are maintained separately. See the following list.
-
.. _external_toplevel:
External Dialects
------
.. automodule:: sqlalchemy.dialects.mssql.pyodbc
-mxODBC
-------
-.. automodule:: sqlalchemy.dialects.mssql.mxodbc
-
pymssql
-------
.. automodule:: sqlalchemy.dialects.mssql.pymssql
.. automodule:: sqlalchemy.dialects.mysql.cymysql
-OurSQL
-------
-
-.. automodule:: sqlalchemy.dialects.mysql.oursql
-
pyodbc
------
------------
.. automodule:: sqlalchemy.dialects.postgresql.psycopg2cffi
-
-py-postgresql
--------------
-
-.. automodule:: sqlalchemy.dialects.postgresql.pypostgresql
-
-.. _dialect-postgresql-pygresql:
-
-pygresql
---------
-
-.. automodule:: sqlalchemy.dialects.postgresql.pygresql
-
+++ /dev/null
-.. _sybase_toplevel:
-
-Sybase
-======
-
-.. automodule:: sqlalchemy.dialects.sybase.base
-
-python-sybase
--------------
-
-.. automodule:: sqlalchemy.dialects.sybase.pysybase
-
-pyodbc
-------
-
-.. automodule:: sqlalchemy.dialects.sybase.pyodbc
-
-mxodbc
-------
-
-.. automodule:: sqlalchemy.dialects.sybase.mxodbc
-
+++ /dev/null
-# connectors/mxodbc.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-Provide a SQLALchemy connector for the eGenix mxODBC commercial
-Python adapter for ODBC. This is not a free product, but eGenix
-provides SQLAlchemy with a license for use in continuous integration
-testing.
-
-This has been tested for use with mxODBC 3.1.2 on SQL Server 2005
-and 2008, using the SQL Server Native driver. However, it is
-possible for this to be used on other database platforms.
-
-For more info on mxODBC, see https://www.egenix.com/
-
-.. deprecated:: 1.4 The mxODBC DBAPI is deprecated and will be removed
- in a future version. Please use one of the supported DBAPIs to
- connect to mssql.
-
-"""
-
-import re
-import sys
-import warnings
-
-from . import Connector
-from ..util import warn_deprecated
-
-
-class MxODBCConnector(Connector):
- driver = "mxodbc"
-
- supports_sane_multi_rowcount = False
- supports_unicode_statements = True
- supports_unicode_binds = True
-
- supports_native_decimal = True
-
- @classmethod
- def dbapi(cls):
- # this classmethod will normally be replaced by an instance
- # attribute of the same name, so this is normally only called once.
- cls._load_mx_exceptions()
- platform = sys.platform
- if platform == "win32":
- from mx.ODBC import Windows as Module
- # this can be the string "linux2", and possibly others
- elif "linux" in platform:
- from mx.ODBC import unixODBC as Module
- elif platform == "darwin":
- from mx.ODBC import iODBC as Module
- else:
- raise ImportError("Unrecognized platform for mxODBC import")
-
- warn_deprecated(
- "The mxODBC DBAPI is deprecated and will be removed"
- "in a future version. Please use one of the supported DBAPIs to"
- "connect to mssql.",
- version="1.4",
- )
- return Module
-
- @classmethod
- def _load_mx_exceptions(cls):
- """Import mxODBC exception classes into the module namespace,
- as if they had been imported normally. This is done here
- to avoid requiring all SQLAlchemy users to install mxODBC.
- """
- global InterfaceError, ProgrammingError
- from mx.ODBC import InterfaceError
- from mx.ODBC import ProgrammingError
-
- def on_connect(self):
- def connect(conn):
- conn.stringformat = self.dbapi.MIXED_STRINGFORMAT
- conn.datetimeformat = self.dbapi.PYDATETIME_DATETIMEFORMAT
- conn.decimalformat = self.dbapi.DECIMAL_DECIMALFORMAT
- conn.errorhandler = self._error_handler()
-
- return connect
-
- def _error_handler(self):
- """Return a handler that adjusts mxODBC's raised Warnings to
- emit Python standard warnings.
- """
- from mx.ODBC.Error import Warning as MxOdbcWarning
-
- def error_handler(connection, cursor, errorclass, errorvalue):
- if issubclass(errorclass, MxOdbcWarning):
- errorclass.__bases__ = (Warning,)
- warnings.warn(
- message=str(errorvalue), category=errorclass, stacklevel=2
- )
- else:
- raise errorclass(errorvalue)
-
- return error_handler
-
- def create_connect_args(self, url):
- r"""Return a tuple of \*args, \**kwargs for creating a connection.
-
- The mxODBC 3.x connection constructor looks like this:
-
- connect(dsn, user='', password='',
- clear_auto_commit=1, errorhandler=None)
-
- This method translates the values in the provided URI
- into args and kwargs needed to instantiate an mxODBC Connection.
-
- The arg 'errorhandler' is not used by SQLAlchemy and will
- not be populated.
-
- """
- opts = url.translate_connect_args(username="user")
- opts.update(url.query)
- args = opts.pop("host")
- opts.pop("port", None)
- opts.pop("database", None)
- return (args,), opts
-
- def is_disconnect(self, e, connection, cursor):
- # TODO: eGenix recommends checking connection.closed here
- # Does that detect dropped connections ?
- if isinstance(e, self.dbapi.ProgrammingError):
- return "connection already closed" in str(e)
- elif isinstance(e, self.dbapi.Error):
- return "[08S01]" in str(e)
- else:
- return False
-
- def _get_server_version_info(self, connection):
- # eGenix suggests using conn.dbms_version instead
- # of what we're doing here
- dbapi_con = connection.connection
- version = []
- r = re.compile(r"[.\-]")
- # 18 == pyodbc.SQL_DBMS_VER
- for n in r.split(dbapi_con.getinfo(18)[1]):
- try:
- version.append(int(n))
- except ValueError:
- version.append(n)
- return tuple(version)
-
- def _get_direct(self, context):
- if context:
- native_odbc_execute = context.execution_options.get(
- "native_odbc_execute", "auto"
- )
- # default to direct=True in all cases, is more generally
- # compatible especially with SQL Server
- return False if native_odbc_execute is True else True
- else:
- return True
-
- def do_executemany(self, cursor, statement, parameters, context=None):
- cursor.executemany(
- statement, parameters, direct=self._get_direct(context)
- )
-
- def do_execute(self, cursor, statement, parameters, context=None):
- cursor.execute(statement, parameters, direct=self._get_direct(context))
+++ /dev/null
-# databases/__init__.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""Include imports from the sqlalchemy.dialects package for backwards
-compatibility with pre 0.6 versions.
-
-"""
-from ..dialects.firebird import base as firebird
-from ..dialects.mssql import base as mssql
-from ..dialects.mysql import base as mysql
-from ..dialects.oracle import base as oracle
-from ..dialects.postgresql import base as postgresql
-from ..dialects.sqlite import base as sqlite
-from ..dialects.sybase import base as sybase
-from ..util import warn_deprecated_20
-
-postgres = postgresql
-
-
-__all__ = (
- "firebird",
- "mssql",
- "mysql",
- "postgresql",
- "sqlite",
- "oracle",
- "sybase",
-)
-
-
-warn_deprecated_20(
- "The `database` package is deprecated and will be removed in v2.0 "
- "of sqlalchemy. Use the `dialects` package instead."
-)
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
-__all__ = (
- "firebird",
- "mssql",
- "mysql",
- "oracle",
- "postgresql",
- "sqlite",
- "sybase",
-)
+__all__ = ("mssql", "mysql", "oracle", "postgresql", "sqlite")
from .. import util
driver = "base"
try:
- if dialect == "firebird":
- try:
- module = __import__("sqlalchemy_firebird")
- except ImportError:
- module = __import__("sqlalchemy.dialects.firebird").dialects
- module = getattr(module, dialect)
- elif dialect == "sybase":
- try:
- module = __import__("sqlalchemy_sybase")
- except ImportError:
- module = __import__("sqlalchemy.dialects.sybase").dialects
- module = getattr(module, dialect)
- elif dialect == "mariadb":
+ if dialect == "mariadb":
# it's "OK" for us to hardcode here since _auto_fn is already
# hardcoded. if mysql / mariadb etc were third party dialects
# they would just publish all the entrypoints, which would actually
+++ /dev/null
-# firebird/__init__.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-from sqlalchemy.dialects.firebird.base import BIGINT
-from sqlalchemy.dialects.firebird.base import BLOB
-from sqlalchemy.dialects.firebird.base import CHAR
-from sqlalchemy.dialects.firebird.base import DATE
-from sqlalchemy.dialects.firebird.base import FLOAT
-from sqlalchemy.dialects.firebird.base import NUMERIC
-from sqlalchemy.dialects.firebird.base import SMALLINT
-from sqlalchemy.dialects.firebird.base import TEXT
-from sqlalchemy.dialects.firebird.base import TIME
-from sqlalchemy.dialects.firebird.base import TIMESTAMP
-from sqlalchemy.dialects.firebird.base import VARCHAR
-from . import base # noqa
-from . import fdb # noqa
-from . import kinterbasdb # noqa
-
-
-base.dialect = dialect = fdb.dialect
-
-__all__ = (
- "SMALLINT",
- "BIGINT",
- "FLOAT",
- "FLOAT",
- "DATE",
- "TIME",
- "TEXT",
- "NUMERIC",
- "FLOAT",
- "TIMESTAMP",
- "VARCHAR",
- "CHAR",
- "BLOB",
- "dialect",
-)
+++ /dev/null
-# firebird/base.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-r"""
-
-.. dialect:: firebird
- :name: Firebird
-
-.. note::
-
- The Firebird dialect within SQLAlchemy **is not currently supported**.
- It is not tested within continuous integration and is likely to have
- many issues and caveats not currently handled. Consider using the
- `external dialect <https://github.com/pauldex/sqlalchemy-firebird>`_
- instead.
-
-.. deprecated:: 1.4 The internal Firebird dialect is deprecated and will be
- removed in a future version. Use the external dialect.
-
-Firebird Dialects
------------------
-
-Firebird offers two distinct dialects_ (not to be confused with a
-SQLAlchemy ``Dialect``):
-
-dialect 1
- This is the old syntax and behaviour, inherited from Interbase pre-6.0.
-
-dialect 3
- This is the newer and supported syntax, introduced in Interbase 6.0.
-
-The SQLAlchemy Firebird dialect detects these versions and
-adjusts its representation of SQL accordingly. However,
-support for dialect 1 is not well tested and probably has
-incompatibilities.
-
-Locking Behavior
-----------------
-
-Firebird locks tables aggressively. For this reason, a DROP TABLE may
-hang until other transactions are released. SQLAlchemy does its best
-to release transactions as quickly as possible. The most common cause
-of hanging transactions is a non-fully consumed result set, i.e.::
-
- result = engine.execute(text("select * from table"))
- row = result.fetchone()
- return
-
-Where above, the ``CursorResult`` has not been fully consumed. The
-connection will be returned to the pool and the transactional state
-rolled back once the Python garbage collector reclaims the objects
-which hold onto the connection, which often occurs asynchronously.
-The above use case can be alleviated by calling ``first()`` on the
-``CursorResult`` which will fetch the first row and immediately close
-all remaining cursor/connection resources.
-
-RETURNING support
------------------
-
-Firebird 2.0 supports returning a result set from inserts, and 2.1
-extends that to deletes and updates. This is generically exposed by
-the SQLAlchemy ``returning()`` method, such as::
-
- # INSERT..RETURNING
- result = table.insert().returning(table.c.col1, table.c.col2).\
- values(name='foo')
- print(result.fetchall())
-
- # UPDATE..RETURNING
- raises = empl.update().returning(empl.c.id, empl.c.salary).\
- where(empl.c.sales>100).\
- values(dict(salary=empl.c.salary * 1.1))
- print(raises.fetchall())
-
-
-.. _dialects: https://mc-computing.com/Databases/Firebird/SQL_Dialect.html
-"""
-
-import datetime
-
-from sqlalchemy import exc
-from sqlalchemy import sql
-from sqlalchemy import types as sqltypes
-from sqlalchemy import util
-from sqlalchemy.engine import default
-from sqlalchemy.engine import reflection
-from sqlalchemy.sql import compiler
-from sqlalchemy.sql import expression
-from sqlalchemy.types import BIGINT
-from sqlalchemy.types import BLOB
-from sqlalchemy.types import DATE
-from sqlalchemy.types import FLOAT
-from sqlalchemy.types import INTEGER
-from sqlalchemy.types import Integer
-from sqlalchemy.types import NUMERIC
-from sqlalchemy.types import SMALLINT
-from sqlalchemy.types import TEXT
-from sqlalchemy.types import TIME
-from sqlalchemy.types import TIMESTAMP
-
-
-RESERVED_WORDS = set(
- [
- "active",
- "add",
- "admin",
- "after",
- "all",
- "alter",
- "and",
- "any",
- "as",
- "asc",
- "ascending",
- "at",
- "auto",
- "avg",
- "before",
- "begin",
- "between",
- "bigint",
- "bit_length",
- "blob",
- "both",
- "by",
- "case",
- "cast",
- "char",
- "character",
- "character_length",
- "char_length",
- "check",
- "close",
- "collate",
- "column",
- "commit",
- "committed",
- "computed",
- "conditional",
- "connect",
- "constraint",
- "containing",
- "count",
- "create",
- "cross",
- "cstring",
- "current",
- "current_connection",
- "current_date",
- "current_role",
- "current_time",
- "current_timestamp",
- "current_transaction",
- "current_user",
- "cursor",
- "database",
- "date",
- "day",
- "dec",
- "decimal",
- "declare",
- "default",
- "delete",
- "desc",
- "descending",
- "disconnect",
- "distinct",
- "do",
- "domain",
- "double",
- "drop",
- "else",
- "end",
- "entry_point",
- "escape",
- "exception",
- "execute",
- "exists",
- "exit",
- "external",
- "extract",
- "fetch",
- "file",
- "filter",
- "float",
- "for",
- "foreign",
- "from",
- "full",
- "function",
- "gdscode",
- "generator",
- "gen_id",
- "global",
- "grant",
- "group",
- "having",
- "hour",
- "if",
- "in",
- "inactive",
- "index",
- "inner",
- "input_type",
- "insensitive",
- "insert",
- "int",
- "integer",
- "into",
- "is",
- "isolation",
- "join",
- "key",
- "leading",
- "left",
- "length",
- "level",
- "like",
- "long",
- "lower",
- "manual",
- "max",
- "maximum_segment",
- "merge",
- "min",
- "minute",
- "module_name",
- "month",
- "names",
- "national",
- "natural",
- "nchar",
- "no",
- "not",
- "null",
- "numeric",
- "octet_length",
- "of",
- "on",
- "only",
- "open",
- "option",
- "or",
- "order",
- "outer",
- "output_type",
- "overflow",
- "page",
- "pages",
- "page_size",
- "parameter",
- "password",
- "plan",
- "position",
- "post_event",
- "precision",
- "primary",
- "privileges",
- "procedure",
- "protected",
- "rdb$db_key",
- "read",
- "real",
- "record_version",
- "recreate",
- "recursive",
- "references",
- "release",
- "reserv",
- "reserving",
- "retain",
- "returning_values",
- "returns",
- "revoke",
- "right",
- "rollback",
- "rows",
- "row_count",
- "savepoint",
- "schema",
- "second",
- "segment",
- "select",
- "sensitive",
- "set",
- "shadow",
- "shared",
- "singular",
- "size",
- "smallint",
- "snapshot",
- "some",
- "sort",
- "sqlcode",
- "stability",
- "start",
- "starting",
- "starts",
- "statistics",
- "sub_type",
- "sum",
- "suspend",
- "table",
- "then",
- "time",
- "timestamp",
- "to",
- "trailing",
- "transaction",
- "trigger",
- "trim",
- "uncommitted",
- "union",
- "unique",
- "update",
- "upper",
- "user",
- "using",
- "value",
- "values",
- "varchar",
- "variable",
- "varying",
- "view",
- "wait",
- "when",
- "where",
- "while",
- "with",
- "work",
- "write",
- "year",
- ]
-)
-
-
-class _StringType(sqltypes.String):
- """Base for Firebird string types."""
-
- def __init__(self, charset=None, **kw):
- self.charset = charset
- super(_StringType, self).__init__(**kw)
-
-
-class VARCHAR(_StringType, sqltypes.VARCHAR):
- """Firebird VARCHAR type"""
-
- __visit_name__ = "VARCHAR"
-
- def __init__(self, length=None, **kwargs):
- super(VARCHAR, self).__init__(length=length, **kwargs)
-
-
-class CHAR(_StringType, sqltypes.CHAR):
- """Firebird CHAR type"""
-
- __visit_name__ = "CHAR"
-
- def __init__(self, length=None, **kwargs):
- super(CHAR, self).__init__(length=length, **kwargs)
-
-
-class _FBDateTime(sqltypes.DateTime):
- def bind_processor(self, dialect):
- def process(value):
- if type(value) == datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- else:
- return value
-
- return process
-
-
-colspecs = {sqltypes.DateTime: _FBDateTime}
-
-ischema_names = {
- "SHORT": SMALLINT,
- "LONG": INTEGER,
- "QUAD": FLOAT,
- "FLOAT": FLOAT,
- "DATE": DATE,
- "TIME": TIME,
- "TEXT": TEXT,
- "INT64": BIGINT,
- "DOUBLE": FLOAT,
- "TIMESTAMP": TIMESTAMP,
- "VARYING": VARCHAR,
- "CSTRING": CHAR,
- "BLOB": BLOB,
-}
-
-
-# TODO: date conversion types (should be implemented as _FBDateTime,
-# _FBDate, etc. as bind/result functionality is required)
-
-
-class FBTypeCompiler(compiler.GenericTypeCompiler):
- def visit_boolean(self, type_, **kw):
- return self.visit_SMALLINT(type_, **kw)
-
- def visit_datetime(self, type_, **kw):
- return self.visit_TIMESTAMP(type_, **kw)
-
- def visit_TEXT(self, type_, **kw):
- return "BLOB SUB_TYPE 1"
-
- def visit_BLOB(self, type_, **kw):
- return "BLOB SUB_TYPE 0"
-
- def _extend_string(self, type_, basic):
- charset = getattr(type_, "charset", None)
- if charset is None:
- return basic
- else:
- return "%s CHARACTER SET %s" % (basic, charset)
-
- def visit_CHAR(self, type_, **kw):
- basic = super(FBTypeCompiler, self).visit_CHAR(type_, **kw)
- return self._extend_string(type_, basic)
-
- def visit_VARCHAR(self, type_, **kw):
- if not type_.length:
- raise exc.CompileError(
- "VARCHAR requires a length on dialect %s" % self.dialect.name
- )
- basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
- return self._extend_string(type_, basic)
-
-
-class FBCompiler(sql.compiler.SQLCompiler):
- """Firebird specific idiosyncrasies"""
-
- ansi_bind_rules = True
-
- # def visit_contains_op_binary(self, binary, operator, **kw):
- # cant use CONTAINING b.c. it's case insensitive.
-
- # def visit_not_contains_op_binary(self, binary, operator, **kw):
- # cant use NOT CONTAINING b.c. it's case insensitive.
-
- def visit_now_func(self, fn, **kw):
- return "CURRENT_TIMESTAMP"
-
- def visit_startswith_op_binary(self, binary, operator, **kw):
- return "%s STARTING WITH %s" % (
- binary.left._compiler_dispatch(self, **kw),
- binary.right._compiler_dispatch(self, **kw),
- )
-
- def visit_not_startswith_op_binary(self, binary, operator, **kw):
- return "%s NOT STARTING WITH %s" % (
- binary.left._compiler_dispatch(self, **kw),
- binary.right._compiler_dispatch(self, **kw),
- )
-
- def visit_mod_binary(self, binary, operator, **kw):
- return "mod(%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def visit_alias(self, alias, asfrom=False, **kwargs):
- if self.dialect._version_two:
- return super(FBCompiler, self).visit_alias(
- alias, asfrom=asfrom, **kwargs
- )
- else:
- # Override to not use the AS keyword which FB 1.5 does not like
- if asfrom:
- alias_name = (
- isinstance(alias.name, expression._truncated_label)
- and self._truncated_identifier("alias", alias.name)
- or alias.name
- )
-
- return (
- self.process(alias.element, asfrom=asfrom, **kwargs)
- + " "
- + self.preparer.format_alias(alias, alias_name)
- )
- else:
- return self.process(alias.element, **kwargs)
-
- def visit_substring_func(self, func, **kw):
- s = self.process(func.clauses.clauses[0])
- start = self.process(func.clauses.clauses[1])
- if len(func.clauses.clauses) > 2:
- length = self.process(func.clauses.clauses[2])
- return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
- else:
- return "SUBSTRING(%s FROM %s)" % (s, start)
-
- def visit_length_func(self, function, **kw):
- if self.dialect._version_two:
- return "char_length" + self.function_argspec(function)
- else:
- return "strlen" + self.function_argspec(function)
-
- visit_char_length_func = visit_length_func
-
- def function_argspec(self, func, **kw):
- # TODO: this probably will need to be
- # narrowed to a fixed list, some no-arg functions
- # may require parens - see similar example in the oracle
- # dialect
- if func.clauses is not None and len(func.clauses):
- return self.process(func.clause_expr, **kw)
- else:
- return ""
-
- def default_from(self):
- return " FROM rdb$database"
-
- def visit_sequence(self, seq, **kw):
- return "gen_id(%s, 1)" % self.preparer.format_sequence(seq)
-
- def get_select_precolumns(self, select, **kw):
- """Called when building a ``SELECT`` statement, position is just
- before column list Firebird puts the limit and offset right
- after the ``SELECT``...
- """
-
- result = ""
- if select._limit_clause is not None:
- result += "FIRST %s " % self.process(select._limit_clause, **kw)
- if select._offset_clause is not None:
- result += "SKIP %s " % self.process(select._offset_clause, **kw)
- result += super(FBCompiler, self).get_select_precolumns(select, **kw)
- return result
-
- def limit_clause(self, select, **kw):
- """Already taken care of in the `get_select_precolumns` method."""
-
- return ""
-
- def returning_clause(self, stmt, returning_cols):
- columns = [
- self._label_returning_column(stmt, c)
- for c in expression._select_iterables(returning_cols)
- ]
-
- return "RETURNING " + ", ".join(columns)
-
-
-class FBDDLCompiler(sql.compiler.DDLCompiler):
- """Firebird syntactic idiosyncrasies"""
-
- def visit_create_sequence(self, create):
- """Generate a ``CREATE GENERATOR`` statement for the sequence."""
-
- # no syntax for these
- # https://www.firebirdsql.org/manual/generatorguide-sqlsyntax.html
- if create.element.start is not None:
- raise NotImplementedError(
- "Firebird SEQUENCE doesn't support START WITH"
- )
- if create.element.increment is not None:
- raise NotImplementedError(
- "Firebird SEQUENCE doesn't support INCREMENT BY"
- )
-
- if self.dialect._version_two:
- return "CREATE SEQUENCE %s" % self.preparer.format_sequence(
- create.element
- )
- else:
- return "CREATE GENERATOR %s" % self.preparer.format_sequence(
- create.element
- )
-
- def visit_drop_sequence(self, drop):
- """Generate a ``DROP GENERATOR`` statement for the sequence."""
-
- if self.dialect._version_two:
- return "DROP SEQUENCE %s" % self.preparer.format_sequence(
- drop.element
- )
- else:
- return "DROP GENERATOR %s" % self.preparer.format_sequence(
- drop.element
- )
-
- def visit_computed_column(self, generated):
- if generated.persisted is not None:
- raise exc.CompileError(
- "Firebird computed columns do not support a persistence "
- "method setting; set the 'persisted' flag to None for "
- "Firebird support."
- )
- return "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
- generated.sqltext, include_table=False, literal_binds=True
- )
-
-
-class FBIdentifierPreparer(sql.compiler.IdentifierPreparer):
- """Install Firebird specific reserved words."""
-
- reserved_words = RESERVED_WORDS
- illegal_initial_characters = compiler.ILLEGAL_INITIAL_CHARACTERS.union(
- ["_"]
- )
-
- def __init__(self, dialect):
- super(FBIdentifierPreparer, self).__init__(dialect, omit_schema=True)
-
-
-class FBExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- """Get the next value from the sequence using ``gen_id()``."""
-
- return self._execute_scalar(
- "SELECT gen_id(%s, 1) FROM rdb$database"
- % self.identifier_preparer.format_sequence(seq),
- type_,
- )
-
-
-class FBDialect(default.DefaultDialect):
- """Firebird dialect"""
-
- name = "firebird"
- supports_statement_cache = True
-
- max_identifier_length = 31
-
- supports_sequences = True
- sequences_optional = False
- supports_default_values = True
- postfetch_lastrowid = False
-
- supports_native_boolean = False
-
- requires_name_normalize = True
- supports_empty_insert = False
-
- statement_compiler = FBCompiler
- ddl_compiler = FBDDLCompiler
- preparer = FBIdentifierPreparer
- type_compiler = FBTypeCompiler
- execution_ctx_cls = FBExecutionContext
-
- colspecs = colspecs
- ischema_names = ischema_names
-
- construct_arguments = []
-
- # defaults to dialect ver. 3,
- # will be autodetected off upon
- # first connect
- _version_two = True
-
- def __init__(self, *args, **kwargs):
- util.warn_deprecated(
- "The firebird dialect is deprecated and will be removed "
- "in a future version. This dialect is superseded by the external "
- "dialect https://github.com/pauldex/sqlalchemy-firebird.",
- version="1.4",
- )
- super(FBDialect, self).__init__(*args, **kwargs)
-
- def initialize(self, connection):
- super(FBDialect, self).initialize(connection)
- self._version_two = (
- "firebird" in self.server_version_info
- and self.server_version_info >= (2,)
- ) or (
- "interbase" in self.server_version_info
- and self.server_version_info >= (6,)
- )
-
- if not self._version_two:
- # TODO: whatever other pre < 2.0 stuff goes here
- self.ischema_names = ischema_names.copy()
- self.ischema_names["TIMESTAMP"] = sqltypes.DATE
- self.colspecs = {sqltypes.DateTime: sqltypes.DATE}
-
- self.implicit_returning = self._version_two and self.__dict__.get(
- "implicit_returning", True
- )
-
- def has_table(self, connection, table_name, schema=None):
- """Return ``True`` if the given table exists, ignoring
- the `schema`."""
- self._ensure_has_table_connection(connection)
-
- tblqry = """
- SELECT 1 AS has_table FROM rdb$database
- WHERE EXISTS (SELECT rdb$relation_name
- FROM rdb$relations
- WHERE rdb$relation_name=?)
- """
- c = connection.exec_driver_sql(
- tblqry, [self.denormalize_name(table_name)]
- )
- return c.first() is not None
-
- def has_sequence(self, connection, sequence_name, schema=None):
- """Return ``True`` if the given sequence (generator) exists."""
-
- genqry = """
- SELECT 1 AS has_sequence FROM rdb$database
- WHERE EXISTS (SELECT rdb$generator_name
- FROM rdb$generators
- WHERE rdb$generator_name=?)
- """
- c = connection.exec_driver_sql(
- genqry, [self.denormalize_name(sequence_name)]
- )
- return c.first() is not None
-
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- # there are two queries commonly mentioned for this.
- # this one, using view_blr, is at the Firebird FAQ among other places:
- # https://www.firebirdfaq.org/faq174/
- s = """
- select rdb$relation_name
- from rdb$relations
- where rdb$view_blr is null
- and (rdb$system_flag is null or rdb$system_flag = 0);
- """
-
- # the other query is this one. It's not clear if there's really
- # any difference between these two. This link:
- # https://www.alberton.info/firebird_sql_meta_info.html#.Ur3vXfZGni8
- # states them as interchangeable. Some discussion at [ticket:2898]
- # SELECT DISTINCT rdb$relation_name
- # FROM rdb$relation_fields
- # WHERE rdb$system_flag=0 AND rdb$view_context IS NULL
-
- return [
- self.normalize_name(row[0])
- for row in connection.exec_driver_sql(s)
- ]
-
- @reflection.cache
- def get_view_names(self, connection, schema=None, **kw):
- # see https://www.firebirdfaq.org/faq174/
- s = """
- select rdb$relation_name
- from rdb$relations
- where rdb$view_blr is not null
- and (rdb$system_flag is null or rdb$system_flag = 0);
- """
- return [
- self.normalize_name(row[0])
- for row in connection.exec_driver_sql(s)
- ]
-
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- qry = """
- SELECT rdb$view_source AS view_source
- FROM rdb$relations
- WHERE rdb$relation_name=?
- """
- rp = connection.exec_driver_sql(
- qry, [self.denormalize_name(view_name)]
- )
- row = rp.first()
- if row:
- return row["view_source"]
- else:
- return None
-
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- # Query to extract the PK/FK constrained fields of the given table
- keyqry = """
- SELECT se.rdb$field_name AS fname
- FROM rdb$relation_constraints rc
- JOIN rdb$index_segments se ON rc.rdb$index_name=se.rdb$index_name
- WHERE rc.rdb$constraint_type=? AND rc.rdb$relation_name=?
- """
- tablename = self.denormalize_name(table_name)
- # get primary key fields
- c = connection.exec_driver_sql(keyqry, ["PRIMARY KEY", tablename])
- pkfields = [self.normalize_name(r["fname"]) for r in c.fetchall()]
- return {"constrained_columns": pkfields, "name": None}
-
- @reflection.cache
- def get_column_sequence(
- self, connection, table_name, column_name, schema=None, **kw
- ):
- tablename = self.denormalize_name(table_name)
- colname = self.denormalize_name(column_name)
- # Heuristic-query to determine the generator associated to a PK field
- genqry = """
- SELECT trigdep.rdb$depended_on_name AS fgenerator
- FROM rdb$dependencies tabdep
- JOIN rdb$dependencies trigdep
- ON tabdep.rdb$dependent_name=trigdep.rdb$dependent_name
- AND trigdep.rdb$depended_on_type=14
- AND trigdep.rdb$dependent_type=2
- JOIN rdb$triggers trig ON
- trig.rdb$trigger_name=tabdep.rdb$dependent_name
- WHERE tabdep.rdb$depended_on_name=?
- AND tabdep.rdb$depended_on_type=0
- AND trig.rdb$trigger_type=1
- AND tabdep.rdb$field_name=?
- AND (SELECT count(*)
- FROM rdb$dependencies trigdep2
- WHERE trigdep2.rdb$dependent_name = trigdep.rdb$dependent_name) = 2
- """
- genr = connection.exec_driver_sql(genqry, [tablename, colname]).first()
- if genr is not None:
- return dict(name=self.normalize_name(genr["fgenerator"]))
-
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- # Query to extract the details of all the fields of the given table
- tblqry = """
- SELECT r.rdb$field_name AS fname,
- r.rdb$null_flag AS null_flag,
- t.rdb$type_name AS ftype,
- f.rdb$field_sub_type AS stype,
- f.rdb$field_length/
- COALESCE(cs.rdb$bytes_per_character,1) AS flen,
- f.rdb$field_precision AS fprec,
- f.rdb$field_scale AS fscale,
- COALESCE(r.rdb$default_source,
- f.rdb$default_source) AS fdefault
- FROM rdb$relation_fields r
- JOIN rdb$fields f ON r.rdb$field_source=f.rdb$field_name
- JOIN rdb$types t
- ON t.rdb$type=f.rdb$field_type AND
- t.rdb$field_name='RDB$FIELD_TYPE'
- LEFT JOIN rdb$character_sets cs ON
- f.rdb$character_set_id=cs.rdb$character_set_id
- WHERE f.rdb$system_flag=0 AND r.rdb$relation_name=?
- ORDER BY r.rdb$field_position
- """
- # get the PK, used to determine the eventual associated sequence
- pk_constraint = self.get_pk_constraint(connection, table_name)
- pkey_cols = pk_constraint["constrained_columns"]
-
- tablename = self.denormalize_name(table_name)
- # get all of the fields for this table
- c = connection.exec_driver_sql(tblqry, [tablename])
- cols = []
- while True:
- row = c.fetchone()
- if row is None:
- break
- name = self.normalize_name(row["fname"])
- orig_colname = row["fname"]
-
- # get the data type
- colspec = row["ftype"].rstrip()
- coltype = self.ischema_names.get(colspec)
- if coltype is None:
- util.warn(
- "Did not recognize type '%s' of column '%s'"
- % (colspec, name)
- )
- coltype = sqltypes.NULLTYPE
- elif issubclass(coltype, Integer) and row["fprec"] != 0:
- coltype = NUMERIC(
- precision=row["fprec"], scale=row["fscale"] * -1
- )
- elif colspec in ("VARYING", "CSTRING"):
- coltype = coltype(row["flen"])
- elif colspec == "TEXT":
- coltype = TEXT(row["flen"])
- elif colspec == "BLOB":
- if row["stype"] == 1:
- coltype = TEXT()
- else:
- coltype = BLOB()
- else:
- coltype = coltype()
-
- # does it have a default value?
- defvalue = None
- if row["fdefault"] is not None:
- # the value comes down as "DEFAULT 'value'": there may be
- # more than one whitespace around the "DEFAULT" keyword
- # and it may also be lower case
- # (see also https://tracker.firebirdsql.org/browse/CORE-356)
- defexpr = row["fdefault"].lstrip()
- assert defexpr[:8].rstrip().upper() == "DEFAULT", (
- "Unrecognized default value: %s" % defexpr
- )
- defvalue = defexpr[8:].strip()
- if defvalue == "NULL":
- # Redundant
- defvalue = None
- col_d = {
- "name": name,
- "type": coltype,
- "nullable": not bool(row["null_flag"]),
- "default": defvalue,
- "autoincrement": "auto",
- }
-
- if orig_colname.lower() == orig_colname:
- col_d["quote"] = True
-
- # if the PK is a single field, try to see if its linked to
- # a sequence thru a trigger
- if len(pkey_cols) == 1 and name == pkey_cols[0]:
- seq_d = self.get_column_sequence(connection, tablename, name)
- if seq_d is not None:
- col_d["sequence"] = seq_d
-
- cols.append(col_d)
- return cols
-
- @reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- # Query to extract the details of each UK/FK of the given table
- fkqry = """
- SELECT rc.rdb$constraint_name AS cname,
- cse.rdb$field_name AS fname,
- ix2.rdb$relation_name AS targetrname,
- se.rdb$field_name AS targetfname
- FROM rdb$relation_constraints rc
- JOIN rdb$indices ix1 ON ix1.rdb$index_name=rc.rdb$index_name
- JOIN rdb$indices ix2 ON ix2.rdb$index_name=ix1.rdb$foreign_key
- JOIN rdb$index_segments cse ON
- cse.rdb$index_name=ix1.rdb$index_name
- JOIN rdb$index_segments se
- ON se.rdb$index_name=ix2.rdb$index_name
- AND se.rdb$field_position=cse.rdb$field_position
- WHERE rc.rdb$constraint_type=? AND rc.rdb$relation_name=?
- ORDER BY se.rdb$index_name, se.rdb$field_position
- """
- tablename = self.denormalize_name(table_name)
-
- c = connection.exec_driver_sql(fkqry, ["FOREIGN KEY", tablename])
- fks = util.defaultdict(
- lambda: {
- "name": None,
- "constrained_columns": [],
- "referred_schema": None,
- "referred_table": None,
- "referred_columns": [],
- }
- )
-
- for row in c:
- cname = self.normalize_name(row["cname"])
- fk = fks[cname]
- if not fk["name"]:
- fk["name"] = cname
- fk["referred_table"] = self.normalize_name(row["targetrname"])
- fk["constrained_columns"].append(self.normalize_name(row["fname"]))
- fk["referred_columns"].append(
- self.normalize_name(row["targetfname"])
- )
- return list(fks.values())
-
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- qry = """
- SELECT ix.rdb$index_name AS index_name,
- ix.rdb$unique_flag AS unique_flag,
- ic.rdb$field_name AS field_name
- FROM rdb$indices ix
- JOIN rdb$index_segments ic
- ON ix.rdb$index_name=ic.rdb$index_name
- LEFT OUTER JOIN rdb$relation_constraints
- ON rdb$relation_constraints.rdb$index_name =
- ic.rdb$index_name
- WHERE ix.rdb$relation_name=? AND ix.rdb$foreign_key IS NULL
- AND rdb$relation_constraints.rdb$constraint_type IS NULL
- ORDER BY index_name, ic.rdb$field_position
- """
- c = connection.exec_driver_sql(
- qry, [self.denormalize_name(table_name)]
- )
-
- indexes = util.defaultdict(dict)
- for row in c:
- indexrec = indexes[row["index_name"]]
- if "name" not in indexrec:
- indexrec["name"] = self.normalize_name(row["index_name"])
- indexrec["column_names"] = []
- indexrec["unique"] = bool(row["unique_flag"])
-
- indexrec["column_names"].append(
- self.normalize_name(row["field_name"])
- )
-
- return list(indexes.values())
+++ /dev/null
-# firebird/fdb.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: firebird+fdb
- :name: fdb
- :dbapi: pyodbc
- :connectstring: firebird+fdb://user:password@host:port/path/to/db[?key=value&key=value...]
- :url: https://pypi.org/project/fdb/
-
- fdb is a kinterbasdb compatible DBAPI for Firebird.
-
- .. versionchanged:: 0.9 - The fdb dialect is now the default dialect
- under the ``firebird://`` URL space, as ``fdb`` is now the official
- Python driver for Firebird.
-
-Arguments
-----------
-
-The ``fdb`` dialect is based on the
-:mod:`sqlalchemy.dialects.firebird.kinterbasdb` dialect, however does not
-accept every argument that Kinterbasdb does.
-
-* ``enable_rowcount`` - True by default, setting this to False disables
- the usage of "cursor.rowcount" with the
- Kinterbasdb dialect, which SQLAlchemy ordinarily calls upon automatically
- after any UPDATE or DELETE statement. When disabled, SQLAlchemy's
- CursorResult will return -1 for result.rowcount. The rationale here is
- that Kinterbasdb requires a second round trip to the database when
- .rowcount is called - since SQLA's resultproxy automatically closes
- the cursor after a non-result-returning statement, rowcount must be
- called, if at all, before the result object is returned. Additionally,
- cursor.rowcount may not return correct results with older versions
- of Firebird, and setting this flag to False will also cause the
- SQLAlchemy ORM to ignore its usage. The behavior can also be controlled on a
- per-execution basis using the ``enable_rowcount`` option with
- :meth:`_engine.Connection.execution_options`::
-
- conn = engine.connect().execution_options(enable_rowcount=True)
- r = conn.execute(stmt)
- print(r.rowcount)
-
-* ``retaining`` - False by default. Setting this to True will pass the
- ``retaining=True`` keyword argument to the ``.commit()`` and ``.rollback()``
- methods of the DBAPI connection, which can improve performance in some
- situations, but apparently with significant caveats.
- Please read the fdb and/or kinterbasdb DBAPI documentation in order to
- understand the implications of this flag.
-
- .. versionchanged:: 0.9.0 - the ``retaining`` flag defaults to ``False``.
- In 0.8 it defaulted to ``True``.
-
- .. seealso::
-
- https://pythonhosted.org/fdb/usage-guide.html#retaining-transactions
- - information on the "retaining" flag.
-
-""" # noqa
-
-from .kinterbasdb import FBDialect_kinterbasdb
-from ... import util
-
-
-class FBDialect_fdb(FBDialect_kinterbasdb):
- supports_statement_cache = True
-
- def __init__(self, enable_rowcount=True, retaining=False, **kwargs):
- super(FBDialect_fdb, self).__init__(
- enable_rowcount=enable_rowcount, retaining=retaining, **kwargs
- )
-
- @classmethod
- def dbapi(cls):
- return __import__("fdb")
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if opts.get("port"):
- opts["host"] = "%s/%s" % (opts["host"], opts["port"])
- del opts["port"]
- opts.update(url.query)
-
- util.coerce_kw_type(opts, "type_conv", int)
-
- return ([], opts)
-
- def _get_server_version_info(self, connection):
- """Get the version of the Firebird server used by a connection.
-
- Returns a tuple of (`major`, `minor`, `build`), three integers
- representing the version of the attached server.
- """
-
- # This is the simpler approach (the other uses the services api),
- # that for backward compatibility reasons returns a string like
- # LI-V6.3.3.12981 Firebird 2.0
- # where the first version is a fake one resembling the old
- # Interbase signature.
-
- isc_info_firebird_version = 103
- fbconn = connection.connection
-
- version = fbconn.db_info(isc_info_firebird_version)
-
- return self._parse_version_info(version)
-
-
-dialect = FBDialect_fdb
+++ /dev/null
-# firebird/kinterbasdb.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: firebird+kinterbasdb
- :name: kinterbasdb
- :dbapi: kinterbasdb
- :connectstring: firebird+kinterbasdb://user:password@host:port/path/to/db[?key=value&key=value...]
- :url: https://firebirdsql.org/index.php?op=devel&sub=python
-
-Arguments
-----------
-
-The Kinterbasdb backend accepts the ``enable_rowcount`` and ``retaining``
-arguments accepted by the :mod:`sqlalchemy.dialects.firebird.fdb` dialect.
-In addition, it also accepts the following:
-
-* ``type_conv`` - select the kind of mapping done on the types: by default
- SQLAlchemy uses 200 with Unicode, datetime and decimal support. See
- the linked documents below for further information.
-
-* ``concurrency_level`` - set the backend policy with regards to threading
- issues: by default SQLAlchemy uses policy 1. See the linked documents
- below for further information.
-
-.. seealso::
-
- https://sourceforge.net/projects/kinterbasdb
-
- https://kinterbasdb.sourceforge.net/dist_docs/usage.html#adv_param_conv_dynamic_type_translation
-
- https://kinterbasdb.sourceforge.net/dist_docs/usage.html#special_issue_concurrency
-
-""" # noqa
-
-import decimal
-from re import match
-
-from .base import FBDialect
-from .base import FBExecutionContext
-from ... import types as sqltypes
-from ... import util
-
-
-class _kinterbasdb_numeric(object):
- def bind_processor(self, dialect):
- def process(value):
- if isinstance(value, decimal.Decimal):
- return str(value)
- else:
- return value
-
- return process
-
-
-class _FBNumeric_kinterbasdb(_kinterbasdb_numeric, sqltypes.Numeric):
- pass
-
-
-class _FBFloat_kinterbasdb(_kinterbasdb_numeric, sqltypes.Float):
- pass
-
-
-class FBExecutionContext_kinterbasdb(FBExecutionContext):
- @property
- def rowcount(self):
- if self.execution_options.get(
- "enable_rowcount", self.dialect.enable_rowcount
- ):
- return self.cursor.rowcount
- else:
- return -1
-
-
-class FBDialect_kinterbasdb(FBDialect):
- driver = "kinterbasdb"
- supports_statement_cache = True
- supports_sane_rowcount = False
- supports_sane_multi_rowcount = False
- execution_ctx_cls = FBExecutionContext_kinterbasdb
-
- supports_native_decimal = True
-
- colspecs = util.update_copy(
- FBDialect.colspecs,
- {
- sqltypes.Numeric: _FBNumeric_kinterbasdb,
- sqltypes.Float: _FBFloat_kinterbasdb,
- },
- )
-
- def __init__(
- self,
- type_conv=200,
- concurrency_level=1,
- enable_rowcount=True,
- retaining=False,
- **kwargs
- ):
- super(FBDialect_kinterbasdb, self).__init__(**kwargs)
- self.enable_rowcount = enable_rowcount
- self.type_conv = type_conv
- self.concurrency_level = concurrency_level
- self.retaining = retaining
- if enable_rowcount:
- self.supports_sane_rowcount = True
-
- @classmethod
- def dbapi(cls):
- return __import__("kinterbasdb")
-
- def do_execute(self, cursor, statement, parameters, context=None):
- # kinterbase does not accept a None, but wants an empty list
- # when there are no arguments.
- cursor.execute(statement, parameters or [])
-
- def do_rollback(self, dbapi_connection):
- dbapi_connection.rollback(self.retaining)
-
- def do_commit(self, dbapi_connection):
- dbapi_connection.commit(self.retaining)
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if opts.get("port"):
- opts["host"] = "%s/%s" % (opts["host"], opts["port"])
- del opts["port"]
- opts.update(url.query)
-
- util.coerce_kw_type(opts, "type_conv", int)
-
- type_conv = opts.pop("type_conv", self.type_conv)
- concurrency_level = opts.pop(
- "concurrency_level", self.concurrency_level
- )
-
- if self.dbapi is not None:
- initialized = getattr(self.dbapi, "initialized", None)
- if initialized is None:
- # CVS rev 1.96 changed the name of the attribute:
- # https://kinterbasdb.cvs.sourceforge.net/viewvc/kinterbasdb/
- # Kinterbasdb-3.0/__init__.py?r1=1.95&r2=1.96
- initialized = getattr(self.dbapi, "_initialized", False)
- if not initialized:
- self.dbapi.init(
- type_conv=type_conv, concurrency_level=concurrency_level
- )
- return ([], opts)
-
- def _get_server_version_info(self, connection):
- """Get the version of the Firebird server used by a connection.
-
- Returns a tuple of (`major`, `minor`, `build`), three integers
- representing the version of the attached server.
- """
-
- # This is the simpler approach (the other uses the services api),
- # that for backward compatibility reasons returns a string like
- # LI-V6.3.3.12981 Firebird 2.0
- # where the first version is a fake one resembling the old
- # Interbase signature.
-
- fbconn = connection.connection
- version = fbconn.server_version
-
- return self._parse_version_info(version)
-
- def _parse_version_info(self, version):
- m = match(
- r"\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+)( \w+ (\d+)\.(\d+))?", version
- )
- if not m:
- raise AssertionError(
- "Could not determine version from string '%s'" % version
- )
-
- if m.group(5) != None:
- return tuple([int(x) for x in m.group(6, 7, 4)] + ["firebird"])
- else:
- return tuple([int(x) for x in m.group(1, 2, 3)] + ["interbase"])
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(
- e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)
- ):
- msg = str(e)
- return (
- "Error writing data to the connection" in msg
- or "Unable to complete network request to host" in msg
- or "Invalid connection state" in msg
- or "Invalid cursor state" in msg
- or "connection shutdown" in msg
- )
- else:
- return False
-
-
-dialect = FBDialect_kinterbasdb
# the MIT License: https://www.opensource.org/licenses/mit-license.php
from . import base # noqa
-from . import mxodbc # noqa
from . import pymssql # noqa
from . import pyodbc # noqa
from .base import BIGINT
+++ /dev/null
-# mssql/mxodbc.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: mssql+mxodbc
- :name: mxODBC
- :dbapi: mxodbc
- :connectstring: mssql+mxodbc://<username>:<password>@<dsnname>
- :url: https://www.egenix.com/
-
-.. deprecated:: 1.4 The mxODBC DBAPI is deprecated and will be removed
- in a future version. Please use one of the supported DBAPIs to
- connect to mssql.
-
-Execution Modes
----------------
-
-mxODBC features two styles of statement execution, using the
-``cursor.execute()`` and ``cursor.executedirect()`` methods (the second being
-an extension to the DBAPI specification). The former makes use of a particular
-API call specific to the SQL Server Native Client ODBC driver known
-SQLDescribeParam, while the latter does not.
-
-mxODBC apparently only makes repeated use of a single prepared statement
-when SQLDescribeParam is used. The advantage to prepared statement reuse is
-one of performance. The disadvantage is that SQLDescribeParam has a limited
-set of scenarios in which bind parameters are understood, including that they
-cannot be placed within the argument lists of function calls, anywhere outside
-the FROM, or even within subqueries within the FROM clause - making the usage
-of bind parameters within SELECT statements impossible for all but the most
-simplistic statements.
-
-For this reason, the mxODBC dialect uses the "native" mode by default only for
-INSERT, UPDATE, and DELETE statements, and uses the escaped string mode for
-all other statements.
-
-This behavior can be controlled via
-:meth:`~sqlalchemy.sql.expression.Executable.execution_options` using the
-``native_odbc_execute`` flag with a value of ``True`` or ``False``, where a
-value of ``True`` will unconditionally use native bind parameters and a value
-of ``False`` will unconditionally use string-escaped parameters.
-
-"""
-
-
-from .base import _MSDate
-from .base import _MSDateTime
-from .base import _MSTime
-from .base import MSDialect
-from .base import VARBINARY
-from .pyodbc import _MSNumeric_pyodbc
-from .pyodbc import MSExecutionContext_pyodbc
-from ... import types as sqltypes
-from ...connectors.mxodbc import MxODBCConnector
-
-
-class _MSNumeric_mxodbc(_MSNumeric_pyodbc):
- """Include pyodbc's numeric processor."""
-
-
-class _MSDate_mxodbc(_MSDate):
- def bind_processor(self, dialect):
- def process(value):
- if value is not None:
- return "%s-%s-%s" % (value.year, value.month, value.day)
- else:
- return None
-
- return process
-
-
-class _MSTime_mxodbc(_MSTime):
- def bind_processor(self, dialect):
- def process(value):
- if value is not None:
- return "%s:%s:%s" % (value.hour, value.minute, value.second)
- else:
- return None
-
- return process
-
-
-class _VARBINARY_mxodbc(VARBINARY):
-
- """
- mxODBC Support for VARBINARY column types.
-
- This handles the special case for null VARBINARY values,
- which maps None values to the mx.ODBC.Manager.BinaryNull symbol.
- """
-
- def bind_processor(self, dialect):
- if dialect.dbapi is None:
- return None
-
- DBAPIBinary = dialect.dbapi.Binary
-
- def process(value):
- if value is not None:
- return DBAPIBinary(value)
- else:
- # should pull from mx.ODBC.Manager.BinaryNull
- return dialect.dbapi.BinaryNull
-
- return process
-
-
-class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
- """
- The pyodbc execution context is useful for enabling
- SELECT SCOPE_IDENTITY in cases where OUTPUT clause
- does not work (tables with insert triggers).
- """
-
- # todo - investigate whether the pyodbc execution context
- # is really only being used in cases where OUTPUT
- # won't work.
-
-
-class MSDialect_mxodbc(MxODBCConnector, MSDialect):
-
- # this is only needed if "native ODBC" mode is used,
- # which is now disabled by default.
- # statement_compiler = MSSQLStrictCompiler
- supports_statement_cache = True
-
- execution_ctx_cls = MSExecutionContext_mxodbc
-
- # flag used by _MSNumeric_mxodbc
- _need_decimal_fix = True
-
- colspecs = {
- sqltypes.Numeric: _MSNumeric_mxodbc,
- sqltypes.DateTime: _MSDateTime,
- sqltypes.Date: _MSDate_mxodbc,
- sqltypes.Time: _MSTime_mxodbc,
- VARBINARY: _VARBINARY_mxodbc,
- sqltypes.LargeBinary: _VARBINARY_mxodbc,
- }
-
- def __init__(self, description_encoding=None, **params):
- super(MSDialect_mxodbc, self).__init__(**params)
- self.description_encoding = description_encoding
-
-
-dialect = MSDialect_mxodbc
from . import mariadbconnector # noqa
from . import mysqlconnector # noqa
from . import mysqldb # noqa
-from . import oursql # noqa
from . import pymysql # noqa
from . import pyodbc # noqa
from .base import BIGINT
# artificial limit if one wasn't provided
# https://dev.mysql.com/doc/refman/5.0/en/select.html
if limit_clause is None:
+ # TODO: remove ??
# hardwire the upper limit. Currently
- # needed by OurSQL with Python 3
- # (https://bugs.launchpad.net/oursql/+bug/686232),
- # but also is consistent with the usage of the upper
+ # needed consistent with the usage of the upper
# bound as part of MySQL's "syntax" for OFFSET with
- # no LIMIT
+ # no LIMIT.
return " \n LIMIT %s, %s" % (
self.process(offset_clause, **kw),
"18446744073709551615",
+++ /dev/null
-# mysql/oursql.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-
-.. dialect:: mysql+oursql
- :name: OurSQL
- :dbapi: oursql
- :connectstring: mysql+oursql://<user>:<password>@<host>[:<port>]/<dbname>
- :url: https://packages.python.org/oursql/
-
-.. note::
-
- The OurSQL MySQL dialect is legacy and is no longer supported upstream,
- and is **not tested as part of SQLAlchemy's continuous integration**.
- The recommended MySQL dialects are mysqlclient and PyMySQL.
-
-.. deprecated:: 1.4 The OurSQL DBAPI is deprecated and will be removed
- in a future version. Please use one of the supported DBAPIs to
- connect to mysql.
-
-Unicode
--------
-
-Please see :ref:`mysql_unicode` for current recommendations on unicode
-handling.
-
-
-"""
-
-
-from .base import BIT
-from .base import MySQLDialect
-from .base import MySQLExecutionContext
-from ... import types as sqltypes
-from ... import util
-
-
-class _oursqlBIT(BIT):
- def result_processor(self, dialect, coltype):
- """oursql already converts mysql bits, so."""
-
- return None
-
-
-class MySQLExecutionContext_oursql(MySQLExecutionContext):
- @property
- def plain_query(self):
- return self.execution_options.get("_oursql_plain_query", False)
-
-
-class MySQLDialect_oursql(MySQLDialect):
- driver = "oursql"
- supports_statement_cache = True
-
- if util.py2k:
- supports_unicode_binds = True
- supports_unicode_statements = True
-
- supports_native_decimal = True
-
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = True
- execution_ctx_cls = MySQLExecutionContext_oursql
-
- colspecs = util.update_copy(
- MySQLDialect.colspecs, {sqltypes.Time: sqltypes.Time, BIT: _oursqlBIT}
- )
-
- @classmethod
- def dbapi(cls):
- util.warn_deprecated(
- "The OurSQL DBAPI is deprecated and will be removed "
- "in a future version. Please use one of the supported DBAPIs to "
- "connect to mysql.",
- version="1.4",
- )
- return __import__("oursql")
-
- def do_execute(self, cursor, statement, parameters, context=None):
- """Provide an implementation of
- *cursor.execute(statement, parameters)*."""
-
- if context and context.plain_query:
- cursor.execute(statement, plain_query=True)
- else:
- cursor.execute(statement, parameters)
-
- def do_begin(self, connection):
- connection.cursor().execute("BEGIN", plain_query=True)
-
- def _xa_query(self, connection, query, xid):
- if util.py2k:
- arg = connection.connection._escape_string(xid)
- else:
- charset = self._connection_charset
- arg = connection.connection._escape_string(
- xid.encode(charset)
- ).decode(charset)
- arg = "'%s'" % arg
- connection.execution_options(_oursql_plain_query=True).exec_driver_sql(
- query % arg
- )
-
- # Because mysql is bad, these methods have to be
- # reimplemented to use _PlainQuery. Basically, some queries
- # refuse to return any data if they're run through
- # the parameterized query API, or refuse to be parameterized
- # in the first place.
- def do_begin_twophase(self, connection, xid):
- self._xa_query(connection, "XA BEGIN %s", xid)
-
- def do_prepare_twophase(self, connection, xid):
- self._xa_query(connection, "XA END %s", xid)
- self._xa_query(connection, "XA PREPARE %s", xid)
-
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if not is_prepared:
- self._xa_query(connection, "XA END %s", xid)
- self._xa_query(connection, "XA ROLLBACK %s", xid)
-
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if not is_prepared:
- self.do_prepare_twophase(connection, xid)
- self._xa_query(connection, "XA COMMIT %s", xid)
-
- # Q: why didn't we need all these "plain_query" overrides earlier ?
- # am i on a newer/older version of OurSQL ?
- def has_table(self, connection, table_name, schema=None):
- return MySQLDialect.has_table(
- self,
- connection.connect().execution_options(_oursql_plain_query=True),
- table_name,
- schema,
- )
-
- def get_table_options(self, connection, table_name, schema=None, **kw):
- return MySQLDialect.get_table_options(
- self,
- connection.connect().execution_options(_oursql_plain_query=True),
- table_name,
- schema=schema,
- **kw
- )
-
- def get_columns(self, connection, table_name, schema=None, **kw):
- return MySQLDialect.get_columns(
- self,
- connection.connect().execution_options(_oursql_plain_query=True),
- table_name,
- schema=schema,
- **kw
- )
-
- def get_view_names(self, connection, schema=None, **kw):
- return MySQLDialect.get_view_names(
- self,
- connection.connect().execution_options(_oursql_plain_query=True),
- schema=schema,
- **kw
- )
-
- def get_table_names(self, connection, schema=None, **kw):
- return MySQLDialect.get_table_names(
- self,
- connection.connect().execution_options(_oursql_plain_query=True),
- schema,
- )
-
- def get_schema_names(self, connection, **kw):
- return MySQLDialect.get_schema_names(
- self,
- connection.connect().execution_options(_oursql_plain_query=True),
- **kw
- )
-
- def initialize(self, connection):
- return MySQLDialect.initialize(
- self, connection.execution_options(_oursql_plain_query=True)
- )
-
- def _show_create_table(
- self, connection, table, charset=None, full_name=None
- ):
- return MySQLDialect._show_create_table(
- self,
- connection.connect(close_with_result=True).execution_options(
- _oursql_plain_query=True
- ),
- table,
- charset,
- full_name,
- )
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(e, self.dbapi.ProgrammingError):
- return (
- e.errno is None
- and "cursor" not in e.args[1]
- and e.args[1].endswith("closed")
- )
- else:
- return e.errno in (2006, 2013, 2014, 2045, 2055)
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(
- database="db", username="user", password="passwd"
- )
- opts.update(url.query)
-
- util.coerce_kw_type(opts, "port", int)
- util.coerce_kw_type(opts, "compress", bool)
- util.coerce_kw_type(opts, "autoping", bool)
- util.coerce_kw_type(opts, "raise_on_warnings", bool)
-
- util.coerce_kw_type(opts, "default_charset", bool)
- if opts.pop("default_charset", False):
- opts["charset"] = None
- else:
- util.coerce_kw_type(opts, "charset", str)
- opts["use_unicode"] = opts.get("use_unicode", True)
- util.coerce_kw_type(opts, "use_unicode", bool)
-
- # FOUND_ROWS must be set in CLIENT_FLAGS to enable
- # supports_sane_rowcount.
- opts.setdefault("found_rows", True)
-
- ssl = {}
- for key in [
- "ssl_ca",
- "ssl_key",
- "ssl_cert",
- "ssl_capath",
- "ssl_cipher",
- ]:
- if key in opts:
- ssl[key[4:]] = opts[key]
- util.coerce_kw_type(ssl, key[4:], str)
- del opts[key]
- if ssl:
- opts["ssl"] = ssl
-
- return [[], opts]
-
- def _extract_error_code(self, exception):
- return exception.errno
-
- def _detect_charset(self, connection):
- """Sniff out the character set in use for connection results."""
-
- return connection.connection.charset
-
- def _compat_fetchall(self, rp, charset=None):
- """oursql isn't super-broken like MySQLdb, yaaay."""
- return rp.fetchall()
-
- def _compat_fetchone(self, rp, charset=None):
- """oursql isn't super-broken like MySQLdb, yaaay."""
- return rp.fetchone()
-
- def _compat_first(self, rp, charset=None):
- return rp.first()
-
-
-dialect = MySQLDialect_oursql
from . import pg8000 # noqa
from . import psycopg2 # noqa
from . import psycopg2cffi # noqa
-from . import pygresql # noqa
-from . import pypostgresql # noqa
from .array import All
from .array import Any
from .array import ARRAY
+++ /dev/null
-# postgresql/pygresql.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-"""
-.. dialect:: postgresql+pygresql
- :name: pygresql
- :dbapi: pgdb
- :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...]
- :url: https://www.pygresql.org/
-
-.. note::
-
- The pygresql dialect is **not tested as part of SQLAlchemy's continuous
- integration** and may have unresolved issues. The recommended PostgreSQL
- dialect is psycopg2.
-
-.. deprecated:: 1.4 The pygresql DBAPI is deprecated and will be removed
- in a future version. Please use one of the supported DBAPIs to
- connect to PostgreSQL.
-
-""" # noqa
-
-import decimal
-import re
-
-from .base import _DECIMAL_TYPES
-from .base import _FLOAT_TYPES
-from .base import _INT_TYPES
-from .base import PGCompiler
-from .base import PGDialect
-from .base import PGIdentifierPreparer
-from .base import UUID
-from .hstore import HSTORE
-from .json import JSON
-from .json import JSONB
-from ... import exc
-from ... import processors
-from ... import util
-from ...sql.elements import Null
-from ...types import JSON as Json
-from ...types import Numeric
-
-
-class _PGNumeric(Numeric):
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect, coltype):
- if not isinstance(coltype, int):
- coltype = coltype.oid
- if self.asdecimal:
- if coltype in _FLOAT_TYPES:
- return processors.to_decimal_processor_factory(
- decimal.Decimal, self._effective_decimal_return_scale
- )
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- # PyGreSQL returns Decimal natively for 1700 (numeric)
- return None
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype
- )
- else:
- if coltype in _FLOAT_TYPES:
- # PyGreSQL returns float natively for 701 (float8)
- return None
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- return processors.to_float
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype
- )
-
-
-class _PGHStore(HSTORE):
- def bind_processor(self, dialect):
- if not dialect.has_native_hstore:
- return super(_PGHStore, self).bind_processor(dialect)
- hstore = dialect.dbapi.Hstore
-
- def process(value):
- if isinstance(value, dict):
- return hstore(value)
- return value
-
- return process
-
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_hstore:
- return super(_PGHStore, self).result_processor(dialect, coltype)
-
-
-class _PGJSON(JSON):
- def bind_processor(self, dialect):
- if not dialect.has_native_json:
- return super(_PGJSON, self).bind_processor(dialect)
- json = dialect.dbapi.Json
-
- def process(value):
- if value is self.NULL:
- value = None
- elif isinstance(value, Null) or (
- value is None and self.none_as_null
- ):
- return None
- if value is None or isinstance(value, (dict, list)):
- return json(value)
- return value
-
- return process
-
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_json:
- return super(_PGJSON, self).result_processor(dialect, coltype)
-
-
-class _PGJSONB(JSONB):
- def bind_processor(self, dialect):
- if not dialect.has_native_json:
- return super(_PGJSONB, self).bind_processor(dialect)
- json = dialect.dbapi.Json
-
- def process(value):
- if value is self.NULL:
- value = None
- elif isinstance(value, Null) or (
- value is None and self.none_as_null
- ):
- return None
- if value is None or isinstance(value, (dict, list)):
- return json(value)
- return value
-
- return process
-
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_json:
- return super(_PGJSONB, self).result_processor(dialect, coltype)
-
-
-class _PGUUID(UUID):
- def bind_processor(self, dialect):
- if not dialect.has_native_uuid:
- return super(_PGUUID, self).bind_processor(dialect)
- uuid = dialect.dbapi.Uuid
-
- def process(value):
- if value is None:
- return None
- if isinstance(value, (str, bytes)):
- if len(value) == 16:
- return uuid(bytes=value)
- return uuid(value)
- if isinstance(value, int):
- return uuid(int=value)
- return value
-
- return process
-
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_uuid:
- return super(_PGUUID, self).result_processor(dialect, coltype)
- if not self.as_uuid:
-
- def process(value):
- if value is not None:
- return str(value)
-
- return process
-
-
-class _PGCompiler(PGCompiler):
- def visit_mod_binary(self, binary, operator, **kw):
- return (
- self.process(binary.left, **kw)
- + " %% "
- + self.process(binary.right, **kw)
- )
-
- def post_process_text(self, text):
- return text.replace("%", "%%")
-
-
-class _PGIdentifierPreparer(PGIdentifierPreparer):
- def _escape_identifier(self, value):
- value = value.replace(self.escape_quote, self.escape_to_quote)
- return value.replace("%", "%%")
-
-
-class PGDialect_pygresql(PGDialect):
-
- driver = "pygresql"
- supports_statement_cache = True
-
- statement_compiler = _PGCompiler
- preparer = _PGIdentifierPreparer
-
- @classmethod
- def dbapi(cls):
- import pgdb
-
- util.warn_deprecated(
- "The pygresql DBAPI is deprecated and will be removed "
- "in a future version. Please use one of the supported DBAPIs to "
- "connect to PostgreSQL.",
- version="1.4",
- )
-
- return pgdb
-
- colspecs = util.update_copy(
- PGDialect.colspecs,
- {
- Numeric: _PGNumeric,
- HSTORE: _PGHStore,
- Json: _PGJSON,
- JSON: _PGJSON,
- JSONB: _PGJSONB,
- UUID: _PGUUID,
- },
- )
-
- def __init__(self, **kwargs):
- super(PGDialect_pygresql, self).__init__(**kwargs)
- try:
- version = self.dbapi.version
- m = re.match(r"(\d+)\.(\d+)", version)
- version = (int(m.group(1)), int(m.group(2)))
- except (AttributeError, ValueError, TypeError):
- version = (0, 0)
- self.dbapi_version = version
- if version < (5, 0):
- has_native_hstore = has_native_json = has_native_uuid = False
- if version != (0, 0):
- util.warn(
- "PyGreSQL is only fully supported by SQLAlchemy"
- " since version 5.0."
- )
- else:
- self.supports_unicode_statements = True
- self.supports_unicode_binds = True
- has_native_hstore = has_native_json = has_native_uuid = True
- self.has_native_hstore = has_native_hstore
- self.has_native_json = has_native_json
- self.has_native_uuid = has_native_uuid
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if "port" in opts:
- opts["host"] = "%s:%s" % (
- opts.get("host", "").rsplit(":", 1)[0],
- opts.pop("port"),
- )
- opts.update(url.query)
- return [], opts
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(e, self.dbapi.Error):
- if not connection:
- return False
- try:
- connection = connection.connection
- except AttributeError:
- pass
- else:
- if not connection:
- return False
- try:
- return connection.closed
- except AttributeError: # PyGreSQL < 5.0
- return connection._cnx is None
- return False
-
-
-dialect = PGDialect_pygresql
+++ /dev/null
-# postgresql/pypostgresql.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-"""
-.. dialect:: postgresql+pypostgresql
- :name: py-postgresql
- :dbapi: pypostgresql
- :connectstring: postgresql+pypostgresql://user:password@host:port/dbname[?key=value&key=value...]
- :url: https://python.projects.pgfoundry.org/
-
-.. note::
-
- The pypostgresql dialect is **not tested as part of SQLAlchemy's continuous
- integration** and may have unresolved issues. The recommended PostgreSQL
- driver is psycopg2.
-
-.. deprecated:: 1.4 The py-postgresql DBAPI is deprecated and will be removed
- in a future version. This DBAPI is superseded by the external
- version available at external-dialect_. Please use the external version or
- one of the supported DBAPIs to connect to PostgreSQL.
-
-.. TODO update link
-.. _external-dialect: https://github.com/PyGreSQL
-
-""" # noqa
-
-from .base import PGDialect
-from .base import PGExecutionContext
-from ... import processors
-from ... import types as sqltypes
-from ... import util
-
-
-class PGNumeric(sqltypes.Numeric):
- def bind_processor(self, dialect):
- return processors.to_str
-
- def result_processor(self, dialect, coltype):
- if self.asdecimal:
- return None
- else:
- return processors.to_float
-
-
-class PGExecutionContext_pypostgresql(PGExecutionContext):
- pass
-
-
-class PGDialect_pypostgresql(PGDialect):
- driver = "pypostgresql"
-
- supports_statement_cache = True
- supports_unicode_statements = True
- supports_unicode_binds = True
- description_encoding = None
- default_paramstyle = "pyformat"
-
- # requires trunk version to support sane rowcounts
- # TODO: use dbapi version information to set this flag appropriately
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = False
-
- execution_ctx_cls = PGExecutionContext_pypostgresql
- colspecs = util.update_copy(
- PGDialect.colspecs,
- {
- sqltypes.Numeric: PGNumeric,
- # prevents PGNumeric from being used
- sqltypes.Float: sqltypes.Float,
- },
- )
-
- @classmethod
- def dbapi(cls):
- from postgresql.driver import dbapi20
-
- # TODO update link
- util.warn_deprecated(
- "The py-postgresql DBAPI is deprecated and will be removed "
- "in a future version. This DBAPI is superseded by the external"
- "version available at https://github.com/PyGreSQL. Please "
- "use one of the supported DBAPIs to connect to PostgreSQL.",
- version="1.4",
- )
-
- return dbapi20
-
- _DBAPI_ERROR_NAMES = [
- "Error",
- "InterfaceError",
- "DatabaseError",
- "DataError",
- "OperationalError",
- "IntegrityError",
- "InternalError",
- "ProgrammingError",
- "NotSupportedError",
- ]
-
- @util.memoized_property
- def dbapi_exception_translation_map(self):
- if self.dbapi is None:
- return {}
-
- return dict(
- (getattr(self.dbapi, name).__name__, name)
- for name in self._DBAPI_ERROR_NAMES
- )
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if "port" in opts:
- opts["port"] = int(opts["port"])
- else:
- opts["port"] = 5432
- opts.update(url.query)
- return ([], opts)
-
- def is_disconnect(self, e, connection, cursor):
- return "connection is closed" in str(e)
-
-
-dialect = PGDialect_pypostgresql
+++ /dev/null
-# sybase/__init__.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-from . import base # noqa
-from . import pyodbc # noqa
-from . import pysybase # noqa
-from .base import BIGINT
-from .base import BINARY
-from .base import BIT
-from .base import CHAR
-from .base import DATE
-from .base import DATETIME
-from .base import FLOAT
-from .base import IMAGE
-from .base import INT
-from .base import INTEGER
-from .base import MONEY
-from .base import NCHAR
-from .base import NUMERIC
-from .base import NVARCHAR
-from .base import SMALLINT
-from .base import SMALLMONEY
-from .base import TEXT
-from .base import TIME
-from .base import TINYINT
-from .base import UNICHAR
-from .base import UNITEXT
-from .base import UNIVARCHAR
-from .base import VARBINARY
-from .base import VARCHAR
-
-
-# default dialect
-base.dialect = dialect = pyodbc.dialect
-
-
-__all__ = (
- "CHAR",
- "VARCHAR",
- "TIME",
- "NCHAR",
- "NVARCHAR",
- "TEXT",
- "DATE",
- "DATETIME",
- "FLOAT",
- "NUMERIC",
- "BIGINT",
- "INT",
- "INTEGER",
- "SMALLINT",
- "BINARY",
- "VARBINARY",
- "UNITEXT",
- "UNICHAR",
- "UNIVARCHAR",
- "IMAGE",
- "BIT",
- "MONEY",
- "SMALLMONEY",
- "TINYINT",
- "dialect",
-)
+++ /dev/null
-# sybase/base.py
-# Copyright (C) 2010-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-# get_select_precolumns(), limit_clause() implementation
-# copyright (C) 2007 Fisch Asset Management
-# AG https://www.fam.ch, with coding by Alexander Houben
-# alexander.houben@thor-solutions.ch
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-
-.. dialect:: sybase
- :name: Sybase
-
-.. note::
-
- The Sybase dialect within SQLAlchemy **is not currently supported**.
- It is not tested within continuous integration and is likely to have
- many issues and caveats not currently handled. Consider using the
- `external dialect <https://github.com/gordthompson/sqlalchemy-sybase>`_
- instead.
-
-.. deprecated:: 1.4 The internal Sybase dialect is deprecated and will be
- removed in a future version. Use the external dialect.
-
-"""
-
-import re
-
-from sqlalchemy import exc
-from sqlalchemy import schema as sa_schema
-from sqlalchemy import types as sqltypes
-from sqlalchemy import util
-from sqlalchemy.engine import default
-from sqlalchemy.engine import reflection
-from sqlalchemy.sql import compiler
-from sqlalchemy.sql import text
-from sqlalchemy.types import BIGINT
-from sqlalchemy.types import BINARY
-from sqlalchemy.types import CHAR
-from sqlalchemy.types import DATE
-from sqlalchemy.types import DATETIME
-from sqlalchemy.types import DECIMAL
-from sqlalchemy.types import FLOAT
-from sqlalchemy.types import INT # noqa
-from sqlalchemy.types import INTEGER
-from sqlalchemy.types import NCHAR
-from sqlalchemy.types import NUMERIC
-from sqlalchemy.types import NVARCHAR
-from sqlalchemy.types import REAL
-from sqlalchemy.types import SMALLINT
-from sqlalchemy.types import TEXT
-from sqlalchemy.types import TIME
-from sqlalchemy.types import TIMESTAMP
-from sqlalchemy.types import Unicode
-from sqlalchemy.types import VARBINARY
-from sqlalchemy.types import VARCHAR
-
-
-RESERVED_WORDS = set(
- [
- "add",
- "all",
- "alter",
- "and",
- "any",
- "as",
- "asc",
- "backup",
- "begin",
- "between",
- "bigint",
- "binary",
- "bit",
- "bottom",
- "break",
- "by",
- "call",
- "capability",
- "cascade",
- "case",
- "cast",
- "char",
- "char_convert",
- "character",
- "check",
- "checkpoint",
- "close",
- "comment",
- "commit",
- "connect",
- "constraint",
- "contains",
- "continue",
- "convert",
- "create",
- "cross",
- "cube",
- "current",
- "current_timestamp",
- "current_user",
- "cursor",
- "date",
- "dbspace",
- "deallocate",
- "dec",
- "decimal",
- "declare",
- "default",
- "delete",
- "deleting",
- "desc",
- "distinct",
- "do",
- "double",
- "drop",
- "dynamic",
- "else",
- "elseif",
- "encrypted",
- "end",
- "endif",
- "escape",
- "except",
- "exception",
- "exec",
- "execute",
- "existing",
- "exists",
- "externlogin",
- "fetch",
- "first",
- "float",
- "for",
- "force",
- "foreign",
- "forward",
- "from",
- "full",
- "goto",
- "grant",
- "group",
- "having",
- "holdlock",
- "identified",
- "if",
- "in",
- "index",
- "index_lparen",
- "inner",
- "inout",
- "insensitive",
- "insert",
- "inserting",
- "install",
- "instead",
- "int",
- "integer",
- "integrated",
- "intersect",
- "into",
- "iq",
- "is",
- "isolation",
- "join",
- "key",
- "lateral",
- "left",
- "like",
- "lock",
- "login",
- "long",
- "match",
- "membership",
- "message",
- "mode",
- "modify",
- "natural",
- "new",
- "no",
- "noholdlock",
- "not",
- "notify",
- "null",
- "numeric",
- "of",
- "off",
- "on",
- "open",
- "option",
- "options",
- "or",
- "order",
- "others",
- "out",
- "outer",
- "over",
- "passthrough",
- "precision",
- "prepare",
- "primary",
- "print",
- "privileges",
- "proc",
- "procedure",
- "publication",
- "raiserror",
- "readtext",
- "real",
- "reference",
- "references",
- "release",
- "remote",
- "remove",
- "rename",
- "reorganize",
- "resource",
- "restore",
- "restrict",
- "return",
- "revoke",
- "right",
- "rollback",
- "rollup",
- "save",
- "savepoint",
- "scroll",
- "select",
- "sensitive",
- "session",
- "set",
- "setuser",
- "share",
- "smallint",
- "some",
- "sqlcode",
- "sqlstate",
- "start",
- "stop",
- "subtrans",
- "subtransaction",
- "synchronize",
- "syntax_error",
- "table",
- "temporary",
- "then",
- "time",
- "timestamp",
- "tinyint",
- "to",
- "top",
- "tran",
- "trigger",
- "truncate",
- "tsequal",
- "unbounded",
- "union",
- "unique",
- "unknown",
- "unsigned",
- "update",
- "updating",
- "user",
- "using",
- "validate",
- "values",
- "varbinary",
- "varchar",
- "variable",
- "varying",
- "view",
- "wait",
- "waitfor",
- "when",
- "where",
- "while",
- "window",
- "with",
- "with_cube",
- "with_lparen",
- "with_rollup",
- "within",
- "work",
- "writetext",
- ]
-)
-
-
-class _SybaseUnitypeMixin(object):
- """these types appear to return a buffer object."""
-
- def result_processor(self, dialect, coltype):
- def process(value):
- if value is not None:
- return str(value) # decode("ucs-2")
- else:
- return None
-
- return process
-
-
-class UNICHAR(_SybaseUnitypeMixin, sqltypes.Unicode):
- __visit_name__ = "UNICHAR"
-
-
-class UNIVARCHAR(_SybaseUnitypeMixin, sqltypes.Unicode):
- __visit_name__ = "UNIVARCHAR"
-
-
-class UNITEXT(_SybaseUnitypeMixin, sqltypes.UnicodeText):
- __visit_name__ = "UNITEXT"
-
-
-class TINYINT(sqltypes.Integer):
- __visit_name__ = "TINYINT"
-
-
-class BIT(sqltypes.TypeEngine):
- __visit_name__ = "BIT"
-
-
-class MONEY(sqltypes.TypeEngine):
- __visit_name__ = "MONEY"
-
-
-class SMALLMONEY(sqltypes.TypeEngine):
- __visit_name__ = "SMALLMONEY"
-
-
-class UNIQUEIDENTIFIER(sqltypes.TypeEngine):
- __visit_name__ = "UNIQUEIDENTIFIER"
-
-
-class IMAGE(sqltypes.LargeBinary):
- __visit_name__ = "IMAGE"
-
-
-class SybaseTypeCompiler(compiler.GenericTypeCompiler):
- def visit_large_binary(self, type_, **kw):
- return self.visit_IMAGE(type_)
-
- def visit_boolean(self, type_, **kw):
- return self.visit_BIT(type_)
-
- def visit_unicode(self, type_, **kw):
- return self.visit_NVARCHAR(type_)
-
- def visit_UNICHAR(self, type_, **kw):
- return "UNICHAR(%d)" % type_.length
-
- def visit_UNIVARCHAR(self, type_, **kw):
- return "UNIVARCHAR(%d)" % type_.length
-
- def visit_UNITEXT(self, type_, **kw):
- return "UNITEXT"
-
- def visit_TINYINT(self, type_, **kw):
- return "TINYINT"
-
- def visit_IMAGE(self, type_, **kw):
- return "IMAGE"
-
- def visit_BIT(self, type_, **kw):
- return "BIT"
-
- def visit_MONEY(self, type_, **kw):
- return "MONEY"
-
- def visit_SMALLMONEY(self, type_, **kw):
- return "SMALLMONEY"
-
- def visit_UNIQUEIDENTIFIER(self, type_, **kw):
- return "UNIQUEIDENTIFIER"
-
-
-ischema_names = {
- "bigint": BIGINT,
- "int": INTEGER,
- "integer": INTEGER,
- "smallint": SMALLINT,
- "tinyint": TINYINT,
- "unsigned bigint": BIGINT, # TODO: unsigned flags
- "unsigned int": INTEGER, # TODO: unsigned flags
- "unsigned smallint": SMALLINT, # TODO: unsigned flags
- "numeric": NUMERIC,
- "decimal": DECIMAL,
- "dec": DECIMAL,
- "float": FLOAT,
- "double": NUMERIC, # TODO
- "double precision": NUMERIC, # TODO
- "real": REAL,
- "smallmoney": SMALLMONEY,
- "money": MONEY,
- "smalldatetime": DATETIME,
- "datetime": DATETIME,
- "date": DATE,
- "time": TIME,
- "char": CHAR,
- "character": CHAR,
- "varchar": VARCHAR,
- "character varying": VARCHAR,
- "char varying": VARCHAR,
- "unichar": UNICHAR,
- "unicode character": UNIVARCHAR,
- "nchar": NCHAR,
- "national char": NCHAR,
- "national character": NCHAR,
- "nvarchar": NVARCHAR,
- "nchar varying": NVARCHAR,
- "national char varying": NVARCHAR,
- "national character varying": NVARCHAR,
- "text": TEXT,
- "unitext": UNITEXT,
- "binary": BINARY,
- "varbinary": VARBINARY,
- "image": IMAGE,
- "bit": BIT,
- # not in documentation for ASE 15.7
- "long varchar": TEXT, # TODO
- "timestamp": TIMESTAMP,
- "uniqueidentifier": UNIQUEIDENTIFIER,
-}
-
-
-class SybaseInspector(reflection.Inspector):
- def __init__(self, conn):
- reflection.Inspector.__init__(self, conn)
-
- def get_table_id(self, table_name, schema=None):
- """Return the table id from `table_name` and `schema`."""
-
- return self.dialect.get_table_id(
- self.bind, table_name, schema, info_cache=self.info_cache
- )
-
-
-class SybaseExecutionContext(default.DefaultExecutionContext):
- _enable_identity_insert = False
-
- def set_ddl_autocommit(self, connection, value):
- """Must be implemented by subclasses to accommodate DDL executions.
-
- "connection" is the raw unwrapped DBAPI connection. "value"
- is True or False. when True, the connection should be configured
- such that a DDL can take place subsequently. when False,
- a DDL has taken place and the connection should be resumed
- into non-autocommit mode.
-
- """
- raise NotImplementedError()
-
- def pre_exec(self):
- if self.isinsert:
- tbl = self.compiled.statement.table
- seq_column = tbl._autoincrement_column
- insert_has_sequence = seq_column is not None
-
- if insert_has_sequence:
- self._enable_identity_insert = (
- seq_column.key in self.compiled_parameters[0]
- )
- else:
- self._enable_identity_insert = False
-
- if self._enable_identity_insert:
- self.cursor.execute(
- "SET IDENTITY_INSERT %s ON"
- % self.dialect.identifier_preparer.format_table(tbl)
- )
-
- if self.isddl:
- # TODO: to enhance this, we can detect "ddl in tran" on the
- # database settings. this error message should be improved to
- # include a note about that.
- if not self.should_autocommit:
- raise exc.InvalidRequestError(
- "The Sybase dialect only supports "
- "DDL in 'autocommit' mode at this time."
- )
-
- self.root_connection.engine.logger.info(
- "AUTOCOMMIT (Assuming no Sybase 'ddl in tran')"
- )
-
- self.set_ddl_autocommit(
- self.root_connection.connection.connection, True
- )
-
- def post_exec(self):
- if self.isddl:
- self.set_ddl_autocommit(self.root_connection, False)
-
- if self._enable_identity_insert:
- self.cursor.execute(
- "SET IDENTITY_INSERT %s OFF"
- % self.dialect.identifier_preparer.format_table(
- self.compiled.statement.table
- )
- )
-
- def get_lastrowid(self):
- cursor = self.create_cursor()
- cursor.execute("SELECT @@identity AS lastrowid")
- lastrowid = cursor.fetchone()[0]
- cursor.close()
- return lastrowid
-
-
-class SybaseSQLCompiler(compiler.SQLCompiler):
- ansi_bind_rules = True
-
- extract_map = util.update_copy(
- compiler.SQLCompiler.extract_map,
- {"doy": "dayofyear", "dow": "weekday", "milliseconds": "millisecond"},
- )
-
- def get_from_hint_text(self, table, text):
- return text
-
- def limit_clause(self, select, **kw):
- text = ""
- if select._limit_clause is not None:
- text += " ROWS LIMIT " + self.process(select._limit_clause, **kw)
- if select._offset_clause is not None:
- if select._limit_clause is None:
- text += " ROWS"
- text += " OFFSET " + self.process(select._offset_clause, **kw)
- return text
-
- def visit_extract(self, extract, **kw):
- field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
-
- def visit_now_func(self, fn, **kw):
- return "GETDATE()"
-
- def for_update_clause(self, select):
- # "FOR UPDATE" is only allowed on "DECLARE CURSOR"
- # which SQLAlchemy doesn't use
- return ""
-
- def order_by_clause(self, select, **kw):
- kw["literal_binds"] = True
- order_by = self.process(select._order_by_clause, **kw)
-
- # SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT
- if order_by and (not self.is_subquery() or select._limit):
- return " ORDER BY " + order_by
- else:
- return ""
-
- def delete_table_clause(self, delete_stmt, from_table, extra_froms):
- """If we have extra froms make sure we render any alias as hint."""
- ashint = False
- if extra_froms:
- ashint = True
- return from_table._compiler_dispatch(
- self, asfrom=True, iscrud=True, ashint=ashint
- )
-
- def delete_extra_from_clause(
- self, delete_stmt, from_table, extra_froms, from_hints, **kw
- ):
- """Render the DELETE .. FROM clause specific to Sybase."""
- kw["asfrom"] = True
- return "FROM " + ", ".join(
- t._compiler_dispatch(self, fromhints=from_hints, **kw)
- for t in [from_table] + extra_froms
- )
-
-
-class SybaseDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- colspec = (
- self.preparer.format_column(column)
- + " "
- + self.dialect.type_compiler.process(
- column.type, type_expression=column
- )
- )
-
- if column.table is None:
- raise exc.CompileError(
- "The Sybase dialect requires Table-bound "
- "columns in order to generate DDL"
- )
- seq_col = column.table._autoincrement_column
-
- # install a IDENTITY Sequence if we have an implicit IDENTITY column
- if seq_col is column:
- sequence = (
- isinstance(column.default, sa_schema.Sequence)
- and column.default
- )
- if sequence:
- start, increment = sequence.start or 1, sequence.increment or 1
- else:
- start, increment = 1, 1
- if (start, increment) == (1, 1):
- colspec += " IDENTITY"
- else:
- # TODO: need correct syntax for this
- colspec += " IDENTITY(%s,%s)" % (start, increment)
- else:
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
-
- if column.nullable is not None:
- if not column.nullable or column.primary_key:
- colspec += " NOT NULL"
- else:
- colspec += " NULL"
-
- return colspec
-
- def visit_drop_index(self, drop):
- index = drop.element
- return "\nDROP INDEX %s.%s" % (
- self.preparer.quote_identifier(index.table.name),
- self._prepared_index_name(drop.element, include_schema=False),
- )
-
-
-class SybaseIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
-
-
-class SybaseDialect(default.DefaultDialect):
- name = "sybase"
- supports_unicode_statements = False
- supports_sane_rowcount = False
- supports_sane_multi_rowcount = False
- supports_statement_cache = True
-
- supports_native_boolean = False
- supports_unicode_binds = False
- postfetch_lastrowid = True
-
- colspecs = {}
- ischema_names = ischema_names
-
- type_compiler = SybaseTypeCompiler
- statement_compiler = SybaseSQLCompiler
- ddl_compiler = SybaseDDLCompiler
- preparer = SybaseIdentifierPreparer
- inspector = SybaseInspector
-
- construct_arguments = []
-
- def __init__(self, *args, **kwargs):
- util.warn_deprecated(
- "The Sybase dialect is deprecated and will be removed "
- "in a future version. This dialect is superseded by the external "
- "dialect https://github.com/gordthompson/sqlalchemy-sybase.",
- version="1.4",
- )
- super(SybaseDialect, self).__init__(*args, **kwargs)
-
- def _get_default_schema_name(self, connection):
- return connection.scalar(
- text("SELECT user_name() as user_name").columns(username=Unicode)
- )
-
- def initialize(self, connection):
- super(SybaseDialect, self).initialize(connection)
- if (
- self.server_version_info is not None
- and self.server_version_info < (15,)
- ):
- self.max_identifier_length = 30
- else:
- self.max_identifier_length = 255
-
- def get_table_id(self, connection, table_name, schema=None, **kw):
- """Fetch the id for schema.table_name.
-
- Several reflection methods require the table id. The idea for using
- this method is that it can be fetched one time and cached for
- subsequent calls.
-
- """
-
- table_id = None
- if schema is None:
- schema = self.default_schema_name
-
- TABLEID_SQL = text(
- """
- SELECT o.id AS id
- FROM sysobjects o JOIN sysusers u ON o.uid=u.uid
- WHERE u.name = :schema_name
- AND o.name = :table_name
- AND o.type in ('U', 'V')
- """
- )
-
- if util.py2k:
- if isinstance(schema, unicode): # noqa
- schema = schema.encode("ascii")
- if isinstance(table_name, unicode): # noqa
- table_name = table_name.encode("ascii")
- result = connection.execute(
- TABLEID_SQL, schema_name=schema, table_name=table_name
- )
- table_id = result.scalar()
- if table_id is None:
- raise exc.NoSuchTableError(table_name)
- return table_id
-
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
-
- COLUMN_SQL = text(
- """
- SELECT col.name AS name,
- t.name AS type,
- (col.status & 8) AS nullable,
- (col.status & 128) AS autoincrement,
- com.text AS 'default',
- col.prec AS precision,
- col.scale AS scale,
- col.length AS length
- FROM systypes t, syscolumns col LEFT OUTER JOIN syscomments com ON
- col.cdefault = com.id
- WHERE col.usertype = t.usertype
- AND col.id = :table_id
- ORDER BY col.colid
- """
- )
-
- results = connection.execute(COLUMN_SQL, table_id=table_id)
-
- columns = []
- for (
- name,
- type_,
- nullable,
- autoincrement,
- default_,
- precision,
- scale,
- length,
- ) in results:
- col_info = self._get_column_info(
- name,
- type_,
- bool(nullable),
- bool(autoincrement),
- default_,
- precision,
- scale,
- length,
- )
- columns.append(col_info)
-
- return columns
-
- def _get_column_info(
- self,
- name,
- type_,
- nullable,
- autoincrement,
- default,
- precision,
- scale,
- length,
- ):
-
- coltype = self.ischema_names.get(type_, None)
-
- kwargs = {}
-
- if coltype in (NUMERIC, DECIMAL):
- args = (precision, scale)
- elif coltype == FLOAT:
- args = (precision,)
- elif coltype in (CHAR, VARCHAR, UNICHAR, UNIVARCHAR, NCHAR, NVARCHAR):
- args = (length,)
- else:
- args = ()
-
- if coltype:
- coltype = coltype(*args, **kwargs)
- # is this necessary
- # if is_array:
- # coltype = ARRAY(coltype)
- else:
- util.warn(
- "Did not recognize type '%s' of column '%s'" % (type_, name)
- )
- coltype = sqltypes.NULLTYPE
-
- if default:
- default = default.replace("DEFAULT", "").strip()
- default = re.sub("^'(.*)'$", lambda m: m.group(1), default)
- else:
- default = None
-
- column_info = dict(
- name=name,
- type=coltype,
- nullable=nullable,
- default=default,
- autoincrement=autoincrement,
- )
- return column_info
-
- @reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None, **kw):
-
- table_id = self.get_table_id(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
-
- table_cache = {}
- column_cache = {}
- foreign_keys = []
-
- table_cache[table_id] = {"name": table_name, "schema": schema}
-
- COLUMN_SQL = text(
- """
- SELECT c.colid AS id, c.name AS name
- FROM syscolumns c
- WHERE c.id = :table_id
- """
- )
-
- results = connection.execute(COLUMN_SQL, table_id=table_id)
- columns = {}
- for col in results:
- columns[col["id"]] = col["name"]
- column_cache[table_id] = columns
-
- REFCONSTRAINT_SQL = text(
- """
- SELECT o.name AS name, r.reftabid AS reftable_id,
- r.keycnt AS 'count',
- r.fokey1 AS fokey1, r.fokey2 AS fokey2, r.fokey3 AS fokey3,
- r.fokey4 AS fokey4, r.fokey5 AS fokey5, r.fokey6 AS fokey6,
- r.fokey7 AS fokey7, r.fokey1 AS fokey8, r.fokey9 AS fokey9,
- r.fokey10 AS fokey10, r.fokey11 AS fokey11, r.fokey12 AS fokey12,
- r.fokey13 AS fokey13, r.fokey14 AS fokey14, r.fokey15 AS fokey15,
- r.fokey16 AS fokey16,
- r.refkey1 AS refkey1, r.refkey2 AS refkey2, r.refkey3 AS refkey3,
- r.refkey4 AS refkey4, r.refkey5 AS refkey5, r.refkey6 AS refkey6,
- r.refkey7 AS refkey7, r.refkey1 AS refkey8, r.refkey9 AS refkey9,
- r.refkey10 AS refkey10, r.refkey11 AS refkey11,
- r.refkey12 AS refkey12, r.refkey13 AS refkey13,
- r.refkey14 AS refkey14, r.refkey15 AS refkey15,
- r.refkey16 AS refkey16
- FROM sysreferences r JOIN sysobjects o on r.tableid = o.id
- WHERE r.tableid = :table_id
- """
- )
- referential_constraints = connection.execute(
- REFCONSTRAINT_SQL, table_id=table_id
- ).fetchall()
-
- REFTABLE_SQL = text(
- """
- SELECT o.name AS name, u.name AS 'schema'
- FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
- WHERE o.id = :table_id
- """
- )
-
- for r in referential_constraints:
- reftable_id = r["reftable_id"]
-
- if reftable_id not in table_cache:
- c = connection.execute(REFTABLE_SQL, table_id=reftable_id)
- reftable = c.fetchone()
- c.close()
- table_info = {"name": reftable["name"], "schema": None}
- if (
- schema is not None
- or reftable["schema"] != self.default_schema_name
- ):
- table_info["schema"] = reftable["schema"]
-
- table_cache[reftable_id] = table_info
- results = connection.execute(COLUMN_SQL, table_id=reftable_id)
- reftable_columns = {}
- for col in results:
- reftable_columns[col["id"]] = col["name"]
- column_cache[reftable_id] = reftable_columns
-
- reftable = table_cache[reftable_id]
- reftable_columns = column_cache[reftable_id]
-
- constrained_columns = []
- referred_columns = []
- for i in range(1, r["count"] + 1):
- constrained_columns.append(columns[r["fokey%i" % i]])
- referred_columns.append(reftable_columns[r["refkey%i" % i]])
-
- fk_info = {
- "constrained_columns": constrained_columns,
- "referred_schema": reftable["schema"],
- "referred_table": reftable["name"],
- "referred_columns": referred_columns,
- "name": r["name"],
- }
-
- foreign_keys.append(fk_info)
-
- return foreign_keys
-
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
-
- INDEX_SQL = text(
- """
- SELECT object_name(i.id) AS table_name,
- i.keycnt AS 'count',
- i.name AS name,
- (i.status & 0x2) AS 'unique',
- index_col(object_name(i.id), i.indid, 1) AS col_1,
- index_col(object_name(i.id), i.indid, 2) AS col_2,
- index_col(object_name(i.id), i.indid, 3) AS col_3,
- index_col(object_name(i.id), i.indid, 4) AS col_4,
- index_col(object_name(i.id), i.indid, 5) AS col_5,
- index_col(object_name(i.id), i.indid, 6) AS col_6,
- index_col(object_name(i.id), i.indid, 7) AS col_7,
- index_col(object_name(i.id), i.indid, 8) AS col_8,
- index_col(object_name(i.id), i.indid, 9) AS col_9,
- index_col(object_name(i.id), i.indid, 10) AS col_10,
- index_col(object_name(i.id), i.indid, 11) AS col_11,
- index_col(object_name(i.id), i.indid, 12) AS col_12,
- index_col(object_name(i.id), i.indid, 13) AS col_13,
- index_col(object_name(i.id), i.indid, 14) AS col_14,
- index_col(object_name(i.id), i.indid, 15) AS col_15,
- index_col(object_name(i.id), i.indid, 16) AS col_16
- FROM sysindexes i, sysobjects o
- WHERE o.id = i.id
- AND o.id = :table_id
- AND (i.status & 2048) = 0
- AND i.indid BETWEEN 1 AND 254
- """
- )
-
- results = connection.execute(INDEX_SQL, table_id=table_id)
- indexes = []
- for r in results:
- column_names = []
- for i in range(1, r["count"]):
- column_names.append(r["col_%i" % (i,)])
- index_info = {
- "name": r["name"],
- "unique": bool(r["unique"]),
- "column_names": column_names,
- }
- indexes.append(index_info)
-
- return indexes
-
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- table_id = self.get_table_id(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
-
- PK_SQL = text(
- """
- SELECT object_name(i.id) AS table_name,
- i.keycnt AS 'count',
- i.name AS name,
- index_col(object_name(i.id), i.indid, 1) AS pk_1,
- index_col(object_name(i.id), i.indid, 2) AS pk_2,
- index_col(object_name(i.id), i.indid, 3) AS pk_3,
- index_col(object_name(i.id), i.indid, 4) AS pk_4,
- index_col(object_name(i.id), i.indid, 5) AS pk_5,
- index_col(object_name(i.id), i.indid, 6) AS pk_6,
- index_col(object_name(i.id), i.indid, 7) AS pk_7,
- index_col(object_name(i.id), i.indid, 8) AS pk_8,
- index_col(object_name(i.id), i.indid, 9) AS pk_9,
- index_col(object_name(i.id), i.indid, 10) AS pk_10,
- index_col(object_name(i.id), i.indid, 11) AS pk_11,
- index_col(object_name(i.id), i.indid, 12) AS pk_12,
- index_col(object_name(i.id), i.indid, 13) AS pk_13,
- index_col(object_name(i.id), i.indid, 14) AS pk_14,
- index_col(object_name(i.id), i.indid, 15) AS pk_15,
- index_col(object_name(i.id), i.indid, 16) AS pk_16
- FROM sysindexes i, sysobjects o
- WHERE o.id = i.id
- AND o.id = :table_id
- AND (i.status & 2048) = 2048
- AND i.indid BETWEEN 1 AND 254
- """
- )
-
- results = connection.execute(PK_SQL, table_id=table_id)
- pks = results.fetchone()
- results.close()
-
- constrained_columns = []
- if pks:
- for i in range(1, pks["count"] + 1):
- constrained_columns.append(pks["pk_%i" % (i,)])
- return {
- "constrained_columns": constrained_columns,
- "name": pks["name"],
- }
- else:
- return {"constrained_columns": [], "name": None}
-
- @reflection.cache
- def get_schema_names(self, connection, **kw):
-
- SCHEMA_SQL = text("SELECT u.name AS name FROM sysusers u")
-
- schemas = connection.execute(SCHEMA_SQL)
-
- return [s["name"] for s in schemas]
-
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- if schema is None:
- schema = self.default_schema_name
-
- TABLE_SQL = text(
- """
- SELECT o.name AS name
- FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
- WHERE u.name = :schema_name
- AND o.type = 'U'
- """
- )
-
- if util.py2k:
- if isinstance(schema, unicode): # noqa
- schema = schema.encode("ascii")
-
- tables = connection.execute(TABLE_SQL, schema_name=schema)
-
- return [t["name"] for t in tables]
-
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- if schema is None:
- schema = self.default_schema_name
-
- VIEW_DEF_SQL = text(
- """
- SELECT c.text
- FROM syscomments c JOIN sysobjects o ON c.id = o.id
- WHERE o.name = :view_name
- AND o.type = 'V'
- """
- )
-
- if util.py2k:
- if isinstance(view_name, unicode): # noqa
- view_name = view_name.encode("ascii")
-
- view = connection.execute(VIEW_DEF_SQL, view_name=view_name)
-
- return view.scalar()
-
- @reflection.cache
- def get_view_names(self, connection, schema=None, **kw):
- if schema is None:
- schema = self.default_schema_name
-
- VIEW_SQL = text(
- """
- SELECT o.name AS name
- FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
- WHERE u.name = :schema_name
- AND o.type = 'V'
- """
- )
-
- if util.py2k:
- if isinstance(schema, unicode): # noqa
- schema = schema.encode("ascii")
- views = connection.execute(VIEW_SQL, schema_name=schema)
-
- return [v["name"] for v in views]
-
- def has_table(self, connection, table_name, schema=None):
- self._ensure_has_table_connection(connection)
-
- try:
- self.get_table_id(connection, table_name, schema)
- except exc.NoSuchTableError:
- return False
- else:
- return True
+++ /dev/null
-# sybase/mxodbc.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-"""
-
-.. dialect:: sybase+mxodbc
- :name: mxODBC
- :dbapi: mxodbc
- :connectstring: sybase+mxodbc://<username>:<password>@<dsnname>
- :url: https://www.egenix.com/
-
-.. note::
-
- This dialect is a stub only and is likely non functional at this time.
-
-"""
-from sqlalchemy.connectors.mxodbc import MxODBCConnector
-from sqlalchemy.dialects.sybase.base import SybaseDialect
-from sqlalchemy.dialects.sybase.base import SybaseExecutionContext
-
-
-class SybaseExecutionContext_mxodbc(SybaseExecutionContext):
- pass
-
-
-class SybaseDialect_mxodbc(MxODBCConnector, SybaseDialect):
- execution_ctx_cls = SybaseExecutionContext_mxodbc
- supports_statement_cache = True
-
-
-dialect = SybaseDialect_mxodbc
+++ /dev/null
-# sybase/pyodbc.py
-# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: sybase+pyodbc
- :name: PyODBC
- :dbapi: pyodbc
- :connectstring: sybase+pyodbc://<username>:<password>@<dsnname>[/<database>]
- :url: https://pypi.org/project/pyodbc/
-
-Unicode Support
----------------
-
-The pyodbc driver currently supports usage of these Sybase types with
-Unicode or multibyte strings::
-
- CHAR
- NCHAR
- NVARCHAR
- TEXT
- VARCHAR
-
-Currently *not* supported are::
-
- UNICHAR
- UNITEXT
- UNIVARCHAR
-
-""" # noqa
-
-import decimal
-
-from sqlalchemy import processors
-from sqlalchemy import types as sqltypes
-from sqlalchemy.connectors.pyodbc import PyODBCConnector
-from sqlalchemy.dialects.sybase.base import SybaseDialect
-from sqlalchemy.dialects.sybase.base import SybaseExecutionContext
-
-
-class _SybNumeric_pyodbc(sqltypes.Numeric):
- """Turns Decimals with adjusted() < -6 into floats.
-
- It's not yet known how to get decimals with many
- significant digits or very large adjusted() into Sybase
- via pyodbc.
-
- """
-
- def bind_processor(self, dialect):
- super_process = super(_SybNumeric_pyodbc, self).bind_processor(dialect)
-
- def process(value):
- if self.asdecimal and isinstance(value, decimal.Decimal):
-
- if value.adjusted() < -6:
- return processors.to_float(value)
-
- if super_process:
- return super_process(value)
- else:
- return value
-
- return process
-
-
-class SybaseExecutionContext_pyodbc(SybaseExecutionContext):
- def set_ddl_autocommit(self, connection, value):
- if value:
- connection.autocommit = True
- else:
- connection.autocommit = False
-
-
-class SybaseDialect_pyodbc(PyODBCConnector, SybaseDialect):
- execution_ctx_cls = SybaseExecutionContext_pyodbc
- supports_statement_cache = True
-
- colspecs = {sqltypes.Numeric: _SybNumeric_pyodbc}
-
- @classmethod
- def dbapi(cls):
- return PyODBCConnector.dbapi()
-
-
-dialect = SybaseDialect_pyodbc
+++ /dev/null
-# sybase/pysybase.py
-# Copyright (C) 2010-2021 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-
-"""
-.. dialect:: sybase+pysybase
- :name: Python-Sybase
- :dbapi: Sybase
- :connectstring: sybase+pysybase://<username>:<password>@<dsn>/[database name]
- :url: https://python-sybase.sourceforge.net/
-
-Unicode Support
----------------
-
-The python-sybase driver does not appear to support non-ASCII strings of any
-kind at this time.
-
-""" # noqa
-
-from sqlalchemy import processors
-from sqlalchemy import types as sqltypes
-from sqlalchemy.dialects.sybase.base import SybaseDialect
-from sqlalchemy.dialects.sybase.base import SybaseExecutionContext
-from sqlalchemy.dialects.sybase.base import SybaseSQLCompiler
-
-
-class _SybNumeric(sqltypes.Numeric):
- def result_processor(self, dialect, type_):
- if not self.asdecimal:
- return processors.to_float
- else:
- return sqltypes.Numeric.result_processor(self, dialect, type_)
-
-
-class SybaseExecutionContext_pysybase(SybaseExecutionContext):
- def set_ddl_autocommit(self, dbapi_connection, value):
- if value:
- # call commit() on the Sybase connection directly,
- # to avoid any side effects of calling a Connection
- # transactional method inside of pre_exec()
- dbapi_connection.commit()
-
- def pre_exec(self):
- SybaseExecutionContext.pre_exec(self)
-
- for param in self.parameters:
- for key in list(param):
- param["@" + key] = param[key]
- del param[key]
-
-
-class SybaseSQLCompiler_pysybase(SybaseSQLCompiler):
- def bindparam_string(self, name, **kw):
- return "@" + name
-
-
-class SybaseDialect_pysybase(SybaseDialect):
- driver = "pysybase"
- execution_ctx_cls = SybaseExecutionContext_pysybase
- statement_compiler = SybaseSQLCompiler_pysybase
-
- supports_statement_cache = True
-
- colspecs = {sqltypes.Numeric: _SybNumeric, sqltypes.Float: sqltypes.Float}
-
- @classmethod
- def dbapi(cls):
- import Sybase
-
- return Sybase
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user", password="passwd")
-
- return ([opts.pop("host")], opts)
-
- def do_executemany(self, cursor, statement, parameters, context=None):
- # calling python-sybase executemany yields:
- # TypeError: string too long for buffer
- for param in parameters:
- cursor.execute(statement, param)
-
- def _get_server_version_info(self, connection):
- vers = connection.exec_driver_sql("select @@version_number").scalar()
- # i.e. 15500, 15000, 12500 == (15, 5, 0, 0), (15, 0, 0, 0),
- # (12, 5, 0, 0)
- return (vers / 1000, vers % 1000 / 100, vers % 100 / 10, vers % 10)
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(
- e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)
- ):
- msg = str(e)
- return (
- "Unable to complete network request to host" in msg
- or "Invalid connection state" in msg
- or "Invalid cursor state" in msg
- )
- else:
- return False
-
-
-dialect = SybaseDialect_pysybase
elif not result._metadata.returns_rows:
# no results, get rowcount
- # (which requires open cursor on some drivers
- # such as kintersbasdb, mxodbc)
+ # (which requires open cursor on some drivers)
result.rowcount
result._soft_close()
return result
def _reflect_col_sequence(self, col_d, colargs):
if "sequence" in col_d:
- # TODO: mssql and sybase are using this.
+ # TODO: mssql is using this.
seq = col_d["sequence"]
sequence = sa_schema.Sequence(seq["name"], 1, 1)
if "start" in seq:
select(mytable).\
with_hint(mytable, "index(%(name)s ix_mytable)", 'oracle').\
- with_hint(mytable, "WITH INDEX ix_mytable", 'sybase')
+ with_hint(mytable, "WITH INDEX ix_mytable", 'mssql')
.. seealso::
without being in the context of a typed column.
"""
- return exclusions.closed()
+ return exclusions.open()
@property
def standalone_null_binds_whereclause(self):
def precision_numerics_enotation_large(self):
"""target backend supports Decimal() objects using E notation
to represent very large values."""
- return exclusions.closed()
+ return exclusions.open()
@property
def precision_numerics_many_significant_digits(self):
else:
kw["mariadb_engine"] = "MyISAM"
- # Apply some default cascading rules for self-referential foreign keys.
- # MySQL InnoDB has some issues around selecting self-refs too.
- if exclusions.against(config._current, "firebird"):
- table_name = args[0]
- unpack = config.db.dialect.identifier_preparer.unformat_identifiers
-
- # Only going after ForeignKeys in Columns. May need to
- # expand to ForeignKeyConstraint too.
- fks = [
- fk
- for col in args
- if isinstance(col, schema.Column)
- for fk in col.foreign_keys
- ]
-
- for fk in fks:
- # root around in raw spec
- ref = fk._colspec
- if isinstance(ref, schema.Column):
- name = ref.table.name
- else:
- # take just the table name: on FB there cannot be
- # a schema, so the first element is always the
- # table name, possibly followed by the field name
- name = unpack(ref)[0]
- if name == table_name:
- if fk.ondelete is None:
- fk.ondelete = "CASCADE"
- if fk.onupdate is None:
- fk.onupdate = "CASCADE"
-
return schema.Table(*args, **kw)
# allow any test suite to pick up on this
col.info["test_needs_autoincrement"] = True
- # hardcoded rule for firebird, oracle; this should
+ # hardcoded rule for oracle; this should
# be moved out
- if exclusions.against(config._current, "firebird", "oracle"):
+ if exclusions.against(config._current, "oracle"):
def add_seq(c, tbl):
c._init_items(
category=sa_exc.SADeprecationWarning,
message=r".*\(deprecated since: 2.0\)$",
)
- warnings.filterwarnings(
- "ignore",
- category=sa_exc.SADeprecationWarning,
- message=r"^The (Sybase|firebird) dialect is deprecated and will be",
- )
try:
import pytest
docker_mssql = mssql+pymssql://scott:tiger^5HHH@127.0.0.1:1433/test
oracle = oracle://scott:tiger@127.0.0.1:1521
oracle8 = oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
-firebird = firebird://sysdba:mainkey@localhost//Users/classic/foo.fdb
from sqlalchemy import update
from sqlalchemy.dialects import mssql
from sqlalchemy.dialects.mssql import base as mssql_base
-from sqlalchemy.dialects.mssql import mxodbc
from sqlalchemy.dialects.mssql.base import try_cast
from sqlalchemy.sql import column
from sqlalchemy.sql import quoted_name
from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler
- mxodbc_dialect = mxodbc.dialect()
- mxodbc_dialect.statement_compiler = MSSQLStrictCompiler
+ mssql_dialect = mssql.dialect()
+ mssql_dialect.statement_compiler = MSSQLStrictCompiler
t = table("sometable", column("foo"))
expr = testing.resolve_lambda(expr, t=t)
- self.assert_compile(expr, compiled, dialect=mxodbc_dialect, **kw)
+ self.assert_compile(expr, compiled, dialect=mssql_dialect, **kw)
def test_in_with_subqueries(self):
"""Test removal of legacy behavior that converted "x==subquery"
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import ComparesTables
-from sqlalchemy.testing import emits_warning_on
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
)
eq_(value, returned)
- @emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*")
def test_dates(self, metadata, connection):
"Exercise type specification for date types."
),
)
- @emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*")
@testing.combinations(
("legacy_large_types", False),
("sql2012_large_types", True, lambda: testing.only_on("mssql >= 11")),
eq_(col.autoincrement, "auto")
is_not(tbl._autoincrement_column, col)
- # mxodbc can't handle scope_identity() with DEFAULT VALUES
-
- if testing.db.driver == "mxodbc":
- eng = [
- engines.testing_engine(
- options={"implicit_returning": True}
- )
- ]
- else:
- eng = [
- engines.testing_engine(
- options={"implicit_returning": False}
- ),
- engines.testing_engine(
- options={"implicit_returning": True}
- ),
- ]
+ eng = [
+ engines.testing_engine(options={"implicit_returning": False}),
+ engines.testing_engine(options={"implicit_returning": True}),
+ ]
for counter, engine in enumerate(eng):
connection.execute(tbl.insert())
eq_(dialect.is_disconnect(error, None, None), is_disconnect)
@testing.combinations(
- ("mysqldb"),
- ("pymysql"),
- ("oursql"),
- id_="s",
- argnames="driver_name",
+ ("mysqldb"), ("pymysql"), id_="s", argnames="driver_name"
)
def test_ssl_arguments(self, driver_name):
url = (
expected["ssl"]["check_hostname"] = False
kwarg = dialect.create_connect_args(make_url(url))[1]
- # args that differ between oursql and others
for k in ("use_unicode", "found_rows", "client_flag"):
kwarg.pop(k, None)
eq_(kwarg, expected)
def test_numeric_codes(self):
from sqlalchemy.dialects.postgresql import (
pg8000,
- pygresql,
psycopg2,
psycopg2cffi,
base,
dialects = (
pg8000.dialect(),
- pygresql.dialect(),
psycopg2.dialect(),
psycopg2cffi.dialect(),
)
+++ /dev/null
-from sqlalchemy import testing
-from sqlalchemy.testing import fixtures
-from sqlalchemy.util.compat import import_
-
-
-class DatabaseRemovedTest(fixtures.TestBase):
- def test_deprecate_databases(self):
- with testing.expect_deprecated_20(
- "The `database` package is deprecated and will be removed in v2.0 "
- ):
- import_("sqlalchemy.databases")
+++ /dev/null
-from sqlalchemy import Column
-from sqlalchemy import Integer
-from sqlalchemy import MetaData
-from sqlalchemy import Table
-from sqlalchemy.testing import engines
-from sqlalchemy.testing import eq_
-from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.mock import Mock
-
-
-def mock_dbapi():
- return Mock(
- paramstyle="qmark",
- connect=Mock(
- return_value=Mock(
- cursor=Mock(return_value=Mock(description=None, rowcount=None))
- )
- ),
- )
-
-
-class MxODBCTest(fixtures.TestBase):
- def test_native_odbc_execute(self):
- t1 = Table("t1", MetaData(), Column("c1", Integer))
- dbapi = mock_dbapi()
-
- engine = engines.testing_engine(
- "mssql+mxodbc://localhost",
- options={"module": dbapi, "_initialize": False},
- )
- conn = engine.connect()
-
- with conn.begin():
- # crud: uses execute
- conn.execute(t1.insert().values(c1="foo"))
- conn.execute(t1.delete().where(t1.c.c1 == "foo"))
- conn.execute(t1.update().where(t1.c.c1 == "foo").values(c1="bar"))
-
- # select: uses executedirect
- conn.execute(t1.select())
-
- # manual flagging
- conn.execution_options(native_odbc_execute=True).execute(
- t1.select()
- )
- conn.execution_options(native_odbc_execute=False).execute(
- t1.insert().values(c1="foo")
- )
-
- eq_(
- # fmt: off
- [
- c[2]
- for c in dbapi.connect.return_value.cursor.
- return_value.execute.mock_calls
- ],
- # fmt: on
- [
- {"direct": True},
- {"direct": True},
- {"direct": True},
- {"direct": True},
- {"direct": False},
- {"direct": True},
- ]
- )
"postgresql+psycopg2",
"Older versions don't support cursor pickling, newer ones do",
)
- @testing.fails_on(
- "mysql+oursql",
- "Exception doesn't come back exactly the same from pickle",
- )
@testing.fails_on(
"mysql+mysqlconnector",
"Exception doesn't come back exactly the same from pickle",
fn(conn, 5, value=8)
self._assert_fn(5, value=8)
- @testing.fails_on("mysql+oursql", "oursql bug ? getting wrong rowcount")
def test_connect_as_ctx_noautocommit(self):
fn = self._trans_fn()
self._assert_no_data()
for name in (
"mysql",
- "firebird",
"postgresql",
"sqlite",
"oracle",
eq_(t.c.keys(), ["q"])
@testing.requires.schemas
- @testing.fails_on("sybase", "FIXME: unknown")
def test_explicit_default_schema_metadata(self, connection, metadata):
schema = connection.dialect.default_schema_name
result_str = ["%d %s" % (t.id, t.category.name) for t in result]
eq_(result_str, ["1 Some Category", "3 Some Category"])
- @testing.crashes("sybase", "FIXME: unknown, verify not fails_on")
def test_without_outerjoin_literal(self):
Thing, tests = (self.classes.Thing, self.tables.tests)
'ERROR: column "users.name" must appear in the GROUP BY clause'
" or be used in an aggregate function",
)
- @testing.fails_on("firebird", "unknown")
def test_values_with_boolean_selects(self):
"""Tests a values clause that works with select boolean
evaluations"""
query = fixture_session().query(func.sum(foo.c.bar))
assert query.filter(foo.c.bar < 30).one() == (435,)
- @testing.fails_on("firebird", "FIXME: unknown")
@testing.fails_on(
"mssql",
"AVG produces an average as the original column type on mssql.",
eq_(result, [self.static.user_result[0]])
- # 'Raises a "expression evaluation not supported" error at prepare time
- @testing.fails_on("firebird", "FIXME: unknown")
def test_function(self):
"""Mapping to a SELECT statement that has functions in it."""
class Secondary(cls.Comparable):
pass
- @testing.fails_on("firebird", "Data type unknown on the parameter")
def test_insert(self):
althohoval, hohoval, default_t, Hoho = (
self.other.althohoval,
self.assert_(h2.foober == h3.foober == h4.foober == "im foober")
eq_(h5.foober, "im the new foober")
- @testing.fails_on("firebird", "Data type unknown on the parameter")
@testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
def test_eager_defaults(self):
hohoval, default_t, Hoho = (
self.sql_count_(0, go)
- @testing.fails_on("firebird", "Data type unknown on the parameter")
def test_update(self):
default_t, Hoho = self.tables.default_t, self.classes.Hoho
session.flush()
eq_(h1.foober, "im the update")
- @testing.fails_on("firebird", "Data type unknown on the parameter")
def test_used_in_relationship(self):
"""A server-side default can be used as the target of a foreign key"""
return skip_if(
[
- no_support("firebird", "not supported by database"),
no_support("mysql", "not supported by database"),
no_support("mariadb", "not supported by database"),
no_support("mssql", "not supported by database"),
"""Target database must support VARCHAR with no length"""
return skip_if(
- ["firebird", "oracle", "mysql", "mariadb"],
+ ["oracle", "mysql", "mariadb"],
"not supported by database",
)
"""Target database must support boolean expressions as columns"""
return skip_if(
[
- no_support("firebird", "not supported by database"),
no_support("oracle", "not supported by database"),
no_support("mssql", "not supported by database"),
- no_support("sybase", "not supported by database"),
]
)
]
)
- @property
- def standalone_binds(self):
- """target database/driver supports bound parameters as column expressions
- without being in the context of a typed column.
-
- """
- return skip_if(["firebird", "mssql+mxodbc"], "not supported by driver")
-
@property
def qmark_paramstyle(self):
- return only_on(
- [
- "firebird",
- "sqlite",
- "+pyodbc",
- "+mxodbc",
- "mysql+oursql",
- "mariadb+oursql",
- ]
- )
+ return only_on(["sqlite", "+pyodbc"])
@property
def named_paramstyle(self):
[
"postgresql+psycopg2",
"postgresql+psycopg2cffi",
- "postgresql+pypostgresql",
- "postgresql+pygresql",
"mysql+mysqlconnector",
"mysql+pymysql",
"mysql+cymysql",
@property
def temporary_tables(self):
"""target database supports temporary tables"""
- return skip_if(["firebird", self._sqlite_file_db], "not supported (?)")
+ return skip_if([self._sqlite_file_db], "not supported (?)")
@property
def temp_table_reflection(self):
@property
def temp_table_reflect_indexes(self):
- return skip_if(
- ["mssql", "firebird", self._sqlite_file_db], "not supported (?)"
- )
+ return skip_if(["mssql", self._sqlite_file_db], "not supported (?)")
@property
def reflectable_autoincrement(self):
PKs assuming they were reflected.
this is essentially all the DBs in "identity" plus PostgreSQL, which
- has SERIAL support. FB and Oracle (and sybase?) require the Sequence
+ has SERIAL support. Oracle requires the Sequence
to be explicitly added, including if the table was reflected.
"""
- return skip_if(
- ["firebird", "oracle", "sybase"], "not supported by database"
- )
-
- @property
- def insert_from_select(self):
- return skip_if(["firebird"], "crashes for unknown reason")
-
- @property
- def fetch_rows_post_commit(self):
- return skip_if(["firebird"], "not supported")
+ return skip_if(["oracle"], "not supported by database")
@property
def non_broken_binary(self):
def updateable_autoincrement_pks(self):
"""Target must support UPDATE on autoincrement/integer primary key."""
- return skip_if(
- ["mssql", "sybase"], "IDENTITY columns can't be updated"
- )
+ return skip_if(["mssql"], "IDENTITY columns can't be updated")
@property
def isolation_level(self):
return only_on(
("postgresql", "sqlite", "mysql", "mariadb", "mssql", "oracle"),
"DBAPI has no isolation level support",
- ) + fails_on(
- "postgresql+pypostgresql",
- "pypostgresql bombs on multiple isolation level calls",
)
@property
return only_on(
("postgresql", "sqlite", "mysql", "mariadb", "mssql"),
"DBAPI has no isolation level support",
- ) + fails_on(
- "postgresql+pypostgresql",
- "pypostgresql bombs on multiple isolation level calls",
)
def get_isolation_levels(self, config):
def delete_from(self):
"""Target must support DELETE FROM..FROM or DELETE..USING syntax"""
return only_on(
- ["postgresql", "mssql", "mysql", "mariadb", "sybase"],
+ ["postgresql", "mssql", "mysql", "mariadb"],
"Backend does not support DELETE..FROM",
)
"""Target database must support savepoints."""
return skip_if(
- ["sqlite", "sybase", ("mysql", "<", (5, 0, 3))],
+ ["sqlite", ("mysql", "<", (5, 0, 3))],
"savepoints not supported",
)
def update_nowait(self):
"""Target database must support SELECT...FOR UPDATE NOWAIT"""
return skip_if(
- ["firebird", "mssql", "mysql", "mariadb<10.3", "sqlite", "sybase"],
+ ["mssql", "mysql", "mariadb<10.3", "sqlite"],
"no FOR UPDATE NOWAIT support",
)
"""Target database must support INTERSECT or equivalent."""
return fails_if(
- ["firebird", self._mysql_not_mariadb_103, "sybase"],
+ [self._mysql_not_mariadb_103],
"no support for INTERSECT",
)
@property
def except_(self):
"""Target database must support EXCEPT or equivalent (i.e. MINUS)."""
- return fails_if(
- ["firebird", self._mysql_not_mariadb_103, "sybase"],
- "no support for EXCEPT",
- )
+ return fails_if([self._mysql_not_mariadb_103], "no support for EXCEPT")
@property
def dupe_order_by_ok(self):
"""
return fails_if(["sqlite", "oracle"])
- @property
- def offset(self):
- """Target database must support some method of adding OFFSET or
- equivalent to a result set."""
- return fails_if(["sybase"], "no support for OFFSET or equivalent")
-
@property
def sql_expression_limit_offset(self):
return (
return skip_if(
[
- no_support("firebird", "no SA implementation"),
no_support("mssql", "two-phase xact not supported by drivers"),
no_support(
"sqlite", "two-phase xact not supported by database"
),
- no_support(
- "sybase", "two-phase xact not supported by drivers/SQLA"
- ),
# in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
# we are evaluating which modern MySQL / MariaDB versions
# can handle two-phase testing without too many problems
"ORA-00932: inconsistent datatypes: expected - got CLOB",
)
- @property
- def unicode_data(self):
- """target drive must support unicode data stored in columns."""
- return skip_if([no_support("sybase", "no unicode driver support")])
-
@property
def unicode_connections(self):
"""
return skip_if(
[
- no_support("sybase", "FIXME: guessing, needs confirmation"),
no_support("mssql+pymssql", "no FreeTDS support"),
]
)
"sqlite+aiosqlite",
"sqlite+pysqlite",
"sqlite+pysqlcipher",
- "sybase",
"mssql",
)
"mariadb",
"sqlite+pysqlite",
"sqlite+pysqlcipher",
- "sybase",
)
- @property
- def implements_get_lastrowid(self):
- return skip_if([no_support("sybase", "not supported by database")])
-
@property
def dbapi_lastrowid(self):
""" "target backend includes a 'lastrowid' accessor on the DBAPI
def nullsordering(self):
"""Target backends that support nulls ordering."""
return fails_on_everything_except(
- "postgresql", "oracle", "firebird", "sqlite >= 3.30.0"
+ "postgresql", "oracle", "sqlite >= 3.30.0"
)
@property
"""Target driver reflects the name of primary key constraints."""
return fails_on_everything_except(
- "postgresql", "oracle", "mssql", "sybase", "sqlite"
+ "postgresql", "oracle", "mssql", "sqlite"
)
@property
"""target dialect supports representation of Python
datetime.datetime() with microsecond objects."""
- return skip_if(
- ["mssql", "mysql", "mariadb", "firebird", "oracle", "sybase"]
- )
+ return skip_if(["mssql", "mysql", "mariadb", "oracle"])
@property
def timestamp_microseconds(self):
"""target dialect supports representation of Python
datetime.datetime() objects with historic (pre 1900) values."""
- return succeeds_if(["sqlite", "postgresql", "firebird"])
+ return succeeds_if(["sqlite", "postgresql"])
@property
def date(self):
"""target dialect supports representation of Python
datetime.datetime() objects with historic (pre 1900) values."""
- return succeeds_if(["sqlite", "postgresql", "firebird"])
+ return succeeds_if(["sqlite", "postgresql"])
@property
def time(self):
"""target dialect supports representation of Python
datetime.time() with microsecond objects."""
- return skip_if(
- ["mssql", "mysql", "mariadb", "firebird", "oracle", "sybase"]
- )
+ return skip_if(["mssql", "mysql", "mariadb", "oracle"])
@property
def precision_numerics_general(self):
# NOTE: this exclusion isn't used in current tests.
return exclusions.open()
- @property
- def precision_numerics_enotation_large(self):
- """target backend supports Decimal() objects using E notation
- to represent very large values."""
-
- return fails_if(
- [
- (
- "sybase+pyodbc",
- None,
- None,
- "Don't know how do get these values through "
- "FreeTDS + Sybase",
- ),
- ("firebird", None, None, "Precision must be from 1 to 18"),
- ]
- )
-
@property
def precision_numerics_many_significant_digits(self):
"""target backend supports values with many digits on both sides,
return fails_if(
[
("sqlite", None, None, "TODO"),
- ("firebird", None, None, "Precision must be from 1 to 18"),
- ("sybase+pysybase", None, None, "TODO"),
]
)
return fails_if(
[
("oracle", None, None, "driver doesn't do this automatically"),
- (
- "firebird",
- None,
- None,
- "database and/or driver truncates decimal places.",
- ),
]
)
None,
"mysql FLOAT type only returns 4 decimals",
),
- (
- "firebird",
- None,
- None,
- "firebird FLOAT type isn't high precision",
- ),
- ]
- )
-
- @property
- def floats_to_four_decimals(self):
- return fails_if(
- [
- ("mysql+oursql", None, None, "Floating point error"),
- ("mariadb+oursql", None, None, "Floating point error"),
- (
- "firebird",
- None,
- None,
- "Firebird still has FP inaccuracy even "
- "with only four decimal places",
- ),
]
)
def order_by_label_with_expression(self):
return fails_if(
[
- (
- "firebird",
- None,
- None,
- "kinterbasdb doesn't send full type information",
- ),
("postgresql", None, None, "only simple labels allowed"),
- ("sybase", None, None, "only simple labels allowed"),
("mssql", None, None, "only simple labels allowed"),
]
)
@property
def selectone(self):
"""target driver must support the literal statement 'select 1'"""
- return skip_if(
- ["oracle", "firebird"], "non-standard SELECT scalar syntax"
- )
+ return skip_if(["oracle"], "non-standard SELECT scalar syntax")
@property
def mysql_for_update(self):
@property
def computed_columns_stored(self):
- return self.computed_columns + skip_if(["oracle", "firebird"])
+ return self.computed_columns + skip_if(["oracle"])
@property
def computed_columns_virtual(self):
- return self.computed_columns + skip_if(["postgresql", "firebird"])
+ return self.computed_columns + skip_if(["postgresql"])
@property
def computed_columns_default_persisted(self):
with testing.db.begin() as conn:
info_table.drop(conn)
- @testing.fails_on("firebird", "FIXME: unknown")
@testing.requires.subqueries
def test_case(self, connection):
inner = select(
from sqlalchemy import union
from sqlalchemy import union_all
from sqlalchemy import util
+from sqlalchemy.dialects import mssql
from sqlalchemy.dialects import mysql
from sqlalchemy.dialects import oracle
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects import sqlite
-from sqlalchemy.dialects import sybase
from sqlalchemy.dialects.postgresql.base import PGCompiler
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.engine import default
s2 = (
select(table1.c.myid)
.with_hint(table1, "index(%(name)s idx)", "oracle")
- .with_hint(table1, "WITH HINT INDEX idx", "sybase")
+ .with_hint(table1, "WITH HINT INDEX idx", "mssql")
)
a1 = table1.alias()
.with_hint(a2, "%(name)s idx1")
)
- mysql_d, oracle_d, sybase_d = (
+ mysql_d, oracle_d, mssql_d = (
mysql.dialect(),
oracle.dialect(),
- sybase.dialect(),
+ mssql.dialect(),
)
for stmt, dialect, expected in [
),
(
s,
- sybase_d,
+ mssql_d,
"SELECT mytable.myid FROM mytable test hint mytable",
),
(s2, mysql_d, "SELECT mytable.myid FROM mytable"),
),
(
s2,
- sybase_d,
+ mssql_d,
"SELECT mytable.myid FROM mytable WITH HINT INDEX idx",
),
(
),
(
s3,
- sybase_d,
+ mssql_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
"index(mytable_1 hint)",
),
),
(
s4,
- sybase_d,
+ mssql_d,
"SELECT thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable "
"hint3 JOIN (SELECT mytable.myid AS myid, "
functions._registry = self._registry
def test_compile(self):
- for dialect in all_dialects(exclude=("sybase",)):
+ for dialect in all_dialects():
bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
self.assert_compile(
func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect
)
self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect)
- if dialect.name in ("firebird",):
- self.assert_compile(
- func.nosuchfunction(), "nosuchfunction", dialect=dialect
- )
- else:
- self.assert_compile(
- func.nosuchfunction(), "nosuchfunction()", dialect=dialect
- )
+ self.assert_compile(
+ func.nosuchfunction(), "nosuchfunction()", dialect=dialect
+ )
# test generic function compile
class fake_func(GenericFunction):
ret[c.key] = row._mapping[c]
return ret, ipk
- if testing.against("firebird", "postgresql", "oracle", "mssql"):
+ if testing.against("postgresql", "oracle", "mssql"):
assert testing.db.dialect.implicit_returning
if testing.db.dialect.implicit_returning:
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import util
-from sqlalchemy.dialects import firebird
from sqlalchemy.dialects import mssql
from sqlalchemy.dialects import mysql
from sqlalchemy.dialects import oracle
dialect=mysql.dialect(),
)
- def test_startswith_firebird(self):
- self.assert_compile(
- column("x").startswith("y"),
- "x STARTING WITH :x_1",
- checkparams={"x_1": "y"},
- dialect=firebird.dialect(),
- )
-
- def test_not_startswith_firebird(self):
- self.assert_compile(
- ~column("x").startswith("y"),
- "x NOT STARTING WITH :x_1",
- checkparams={"x_1": "y"},
- dialect=firebird.dialect(),
- )
-
def test_startswith_literal_mysql(self):
self.assert_compile(
column("x").startswith(literal_column("y")),
test_needs_acid=True,
)
- @testing.fails_on(
- "firebird", "kinterbasdb doesn't send full type information"
- )
def test_order_by_label(self, connection):
"""test that a label within an ORDER BY works on each backend.
eq_(len(compiled._bind_processors), 1)
- @testing.fails_on("firebird", "uses sql-92 rules")
- @testing.fails_on("sybase", "uses sql-92 rules")
@testing.skip_if(["mssql"])
def test_bind_in(self, connection):
"""test calling IN against a bind parameter.
)
eq_(found2, wanted)
- @testing.fails_on("firebird", "doesn't like ORDER BY with UNIONs")
def test_union_ordered(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
]
eq_(connection.execute(u).fetchall(), wanted)
- @testing.fails_on("firebird", "doesn't like ORDER BY with UNIONs")
@testing.requires.subqueries
def test_union_ordered_alias(self, connection):
t1, t2, t3 = self.tables("t1", "t2", "t3")
eq_(connection.execute(u.alias("bar").select()).fetchall(), wanted)
@testing.crashes("oracle", "FIXME: unknown, verify not fails_on")
- @testing.fails_on(
- "firebird",
- "has trouble extracting anonymous column from union subquery",
- )
@testing.fails_on(
testing.requires._mysql_not_mariadb_104, "FIXME: unknown"
)
eq_(list(r._mapping.values()), ["foo", 1])
@testing.crashes("oracle", "FIXME: unknown, verify not fails_on()")
- @testing.crashes("firebird", "An identifier must begin with a letter")
@testing.provide_metadata
def test_column_accessor_shadow(self, connection):
shadowed = Table(
eq_(row[table.c.goofy], row["goofy"])
eq_(row["goofy"], "FOOsomegoofyBAR")
- @testing.fails_on("firebird", "fb can't handle returning x AS y")
def test_labeling(self, connection):
table = self.tables.tables
result = connection.execute(
row = result.first()._mapping
assert row["lala"] == 6
- @testing.fails_on(
- "firebird", "fb/kintersbasdb can't handle the bind params"
- )
def test_anon_expressions(self, connection):
table = self.tables.tables
GoofyType = self.GoofyType
"inserted_primary_key",
)
- @testing.fails_on_everything_except("postgresql", "firebird")
+ @testing.fails_on_everything_except("postgresql")
def test_literal_returning(self, connection):
if testing.against("postgresql"):
literal_true = "true"
Column("data", String(20)),
)
- @testing.exclude("firebird", "<", (2, 0), "2.0+ feature")
@testing.exclude("postgresql", "<", (8, 2), "8.2+ feature")
def test_insert(self, connection):
table = self.tables.tables
assert isinstance(val, float)
# some DBAPIs have unusual float handling
- if testing.against("oracle+cx_oracle", "mysql+oursql", "firebird"):
+ if testing.against("oracle+cx_oracle"):
eq_(round_decimal(val, 3), 46.583)
else:
eq_(val, 46.583)