$ pytest --dbs
Available --db options (use --dburi to override)
+ aiomysql mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+ aiomysql_fallback mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
+ aiosqlite sqlite+aiosqlite:///:memory:
+ aiosqlite_file sqlite+aiosqlite:///async_querytest.db
+ asyncmy mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+ asyncmy_fallback mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
+ asyncpg postgresql+asyncpg://scott:tiger@192.168.0.199/test
+ asyncpg_async_fallback postgresql+asyncpg://scott:tiger@192.168.0.199/test?async_fallback=true
+ asyncpg_fallback postgresql+asyncpg://scott:tiger@127.0.0.1:5432/test?async_fallback=true
default sqlite:///:memory:
- mariadb mariadb://scott:tiger@192.168.0.199:3307/test
- mssql mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server
+ docker_mssql mssql+pymssql://scott:tiger^5HHH@127.0.0.1:1433/test
+ mariadb mariadb+mysqldb://scott:tiger@127.0.0.1:3306/test
+ mssql mssql+pyodbc://scott:tiger^5HHH@localhost:1433/test?driver=ODBC+Driver+17+for+SQL+Server
+ mssql_199 mssql+pyodbc://scott:tiger^5HHH@192.168.0.199:1433/test?driver=ODBC+Driver+17+for+SQL+Server
mssql_pymssql mssql+pymssql://scott:tiger@ms_2008
- mysql mysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
- oracle oracle://scott:tiger@127.0.0.1:1521
- oracle8 oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
+ mysql mysql+mysqldb://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+ oracle oracle+cx_oracle://scott:tiger@127.0.0.1:1521
+ oracle8 oracle+cx_oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
pg8000 postgresql+pg8000://scott:tiger@127.0.0.1:5432/test
- postgresql postgresql://scott:tiger@127.0.0.1:5432/test
+ postgresql postgresql+psycopg2://scott:tiger@192.168.0.199/test
postgresql_psycopg2cffi postgresql+psycopg2cffi://scott:tiger@127.0.0.1:5432/test
pymysql mysql+pymysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+ pysqlcipher_file sqlite+pysqlcipher://:test@/querytest.db.enc
sqlite sqlite:///:memory:
sqlite_file sqlite:///querytest.db
# test.cfg file
[db]
- my_postgresql=postgresql://username:pass@hostname/dbname
+ my_postgresql=postgresql+psycopg2://username:pass@hostname/dbname
Above, we can now run the tests with ``my_postgresql``::
# test.cfg file
[db]
- postgresql=postgresql://username:pass@hostname/dbname
+ postgresql=postgresql+psycopg2://username:pass@hostname/dbname
Now when we run ``tox -e py27-postgresql``, it will use our custom URL instead
of the fixed one in setup.cfg.
from sqlalchemy import create_engine
- engine = create_engine("postgresql:///")
+ engine = create_engine("postgresql+psycopg2:///")
with engine.connect() as conn:
conn.execute(text("insert into table (x) values (:some_x)"), {"some_x": 10})
Recall from :doc:`/core/engines` that an :class:`_engine.Engine` is created via
the :func:`_sa.create_engine` call::
- engine = create_engine('mysql://scott:tiger@localhost/test')
+ engine = create_engine('mysql+mysqldb://scott:tiger@localhost/test')
The typical usage of :func:`_sa.create_engine` is once per particular database
URL, held globally for the lifetime of a single application process. A single
from sqlalchemy import create_engine
eng = create_engine(
- "postgresql://scott:tiger@localhost/test",
+ "postgresql+psycopg2://scott:tiger@localhost/test",
execution_options={
"isolation_level": "REPEATABLE READ"
}
from sqlalchemy import create_engine
- eng = create_engine("postgresql://scott:tiger@localhost/test")
+ eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
of this cache defaults to 500 and may be configured using the
:paramref:`_sa.create_engine.query_cache_size` parameter::
- engine = create_engine("postgresql://scott:tiger@localhost/test", query_cache_size=1200)
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", query_cache_size=1200)
The size of the cache can grow to be a factor of 150% of the size given, before
it's pruned back down to the target size. A cache of size 1200 above can therefore
PGPString("this is my passphrase")),
)
- engine = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True)
with engine.begin() as conn:
metadata_obj.create_all(conn)
:func:`_sa.create_engine()`::
from sqlalchemy import create_engine
- engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase')
+ engine = create_engine('postgresql+psycopg2://scott:tiger@localhost:5432/mydatabase')
The above engine creates a :class:`.Dialect` object tailored towards
PostgreSQL, as well as a :class:`_pool.Pool` object which will establish a DBAPI
for keys and either strings or tuples of strings for values, e.g.::
>>> from sqlalchemy.engine import make_url
- >>> url = make_url("postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
+ >>> url = make_url("postgresql+psycopg2://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
>>> url.query
immutabledict({'alt_host': ('host1', 'host2'), 'ssl_cipher': '/path/to/crt'})
engine = create_engine(
- "postgresql://user:pass@hostname/dbname",
+ "postgresql+psycopg2://user:pass@hostname/dbname",
connect_args={"connection_factory": MyConnectionFactory}
)
from sqlalchemy import event
- engine = create_engine("postgresql://user:pass@hostname/dbname")
+ engine = create_engine("postgresql+psycopg2://user:pass@hostname/dbname")
@event.listens_for(engine, "do_connect")
def receive_do_connect(dialect, conn_rec, cargs, cparams):
from sqlalchemy import event
- engine = create_engine("postgresql://user@hostname/dbname")
+ engine = create_engine("postgresql+psycopg2://user@hostname/dbname")
@event.listens_for(engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
from sqlalchemy import event
engine = create_engine(
- "postgresql://user:pass@hostname/dbname"
+ "postgresql+psycopg2://user:pass@hostname/dbname"
)
@event.listens_for(engine, "connect")
from sqlalchemy import event
engine = create_engine(
- "postgresql://user:pass@hostname/dbname"
+ "postgresql+psycopg2://user:pass@hostname/dbname"
)
@event.listens_for(engine, "do_connect")
return psycopg2.connect(user='ed', host='127.0.0.1', dbname='test')
my_pool = QueuePool(connect)
- my_engine = create_engine('postgresql://ed@localhost/test')
+ my_engine = create_engine('postgresql+psycopg2://ed@localhost/test')
# associate listener with all instances of Pool
listen(Pool, 'connect', my_on_connect)
``pool_size``, ``max_overflow``, ``pool_recycle`` and
``pool_timeout``. For example::
- engine = create_engine('postgresql://me@localhost/mydb',
+ engine = create_engine('postgresql+psycopg2://me@localhost/mydb',
pool_size=20, max_overflow=0)
In the case of SQLite, the :class:`.SingletonThreadPool` or
period of time::
from sqlalchemy import create_engine
- e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)
+ e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", pool_recycle=3600)
Above, any DBAPI connection that has been open for more than one hour will be invalidated and replaced,
upon next checkout. Note that the invalidation **only** occurs during checkout - not on
more than once::
from sqlalchemy.pool import NullPool
- engine = create_engine("mysql://user:pass@host/dbname", poolclass=NullPool)
+ engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool)
2. Call :meth:`_engine.Engine.dispose` on any given :class:`_engine.Engine` as
single call to :meth:`_engine.Engine.dispose` will ensure any remaining
connections are flushed. **This is the recommended approach**::
- engine = create_engine("mysql://user:pass@host/dbname")
+ engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
def run_in_process():
# process starts. ensure engine.dispose() is called just once
.. sourcecode:: pycon+sql
>>> from sqlalchemy import MetaData, Table, create_engine
- >>> mysql_engine = create_engine("mysql://scott:tiger@localhost/test")
+ >>> mysql_engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test")
>>> metadata_obj = MetaData()
>>> my_mysql_table = Table("my_table", metadata_obj, autoload_with=mysql_engine)
.. sourcecode:: pycon+sql
- >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
+ >>> pg_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True)
>>> my_generic_table.create(pg_engine)
{opensql}CREATE TABLE my_table (
id SERIAL NOT NULL,
**pool size plus the max overflow**. That means if you have configured
your engine as::
- engine = create_engine("mysql://u:p@host/db", pool_size=10, max_overflow=20)
+ engine = create_engine("mysql+mysqldb://u:p@host/db", pool_size=10, max_overflow=20)
The above :class:`_engine.Engine` will allow **at most 30 connections** to be in
play at any time, not including connections that were detached from the
The :func:`_sa.create_engine` call accepts additional arguments either
directly via the ``connect_args`` keyword argument::
- e = create_engine("mysql://scott:tiger@localhost/test",
+ e = create_engine("mysql+mysqldb://scott:tiger@localhost/test",
connect_args={"encoding": "utf8"})
Or for basic string and integer arguments, they can usually be specified
in the query string of the URL::
- e = create_engine("mysql://scott:tiger@localhost/test?encoding=utf8")
+ e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")
.. seealso::
if __name__ == "__main__":
- engine = create_engine("mysql://scott:tiger@localhost/test", echo_pool=True)
+ engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)
def do_a_thing(engine):
with engine.begin() as conn:
e = reconnecting_engine(
create_engine(
- "mysql://scott:tiger@localhost/test", echo_pool=True
+ "mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True
),
num_retries=5,
retry_interval=2,
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
- engine = create_engine('mysql://scott:tiger@localhost/myisam_database', pool=QueuePool(reset_on_return=False))
+ engine = create_engine('mysql+mysqldb://scott:tiger@localhost/myisam_database', pool=QueuePool(reset_on_return=False))
I'm on SQL Server - how do I turn those ROLLBACKs into COMMITs?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
to ``True``, ``False``, and ``None``. Setting to ``commit`` will cause
a COMMIT as any connection is returned to the pool::
- engine = create_engine('mssql://scott:tiger@mydsn', pool=QueuePool(reset_on_return='commit'))
+ engine = create_engine('mssql+pyodbc://scott:tiger@mydsn', pool=QueuePool(reset_on_return='commit'))
I am using multiple connections with a SQLite database (typically to test transaction operation), and my test program is not working!
def dump(sql, *multiparams, **params):
print(sql.compile(dialect=engine.dialect))
- engine = create_mock_engine('postgresql://', dump)
+ engine = create_mock_engine('postgresql+psycopg2://', dump)
metadata_obj.create_all(engine, checkfirst=False)
The `Alembic <https://alembic.sqlalchemy.org>`_ tool also supports
from sqlalchemy.orm import sessionmaker
- engine = create_engine("mysql://user:pass@host/dbname", future=True)
+ engine = create_engine("mysql+mysqldb://user:pass@host/dbname", future=True)
Session = sessionmaker(bind=engine, future=True)
**ORM Queries in 2.0 style**
:paramref:`_schema.Table.autoload_with` parameter to the
:class:`_schema.Table`::
- engine = create_engine("postgresql://user:pass@hostname/my_existing_database")
+ engine = create_engine("postgresql+psycopg2://user:pass@hostname/my_existing_database")
class MyClass(Base):
__table__ = Table(
complete until we do so, given an :class:`_engine.Engine`::
- engine = create_engine("postgresql://user:pass@hostname/my_existing_database")
+ engine = create_engine("postgresql+psycopg2://user:pass@hostname/my_existing_database")
Reflected.prepare(engine)
The purpose of the ``Reflected`` class is to define the scope at which
pk = Column(Integer, primary_key=True)
bar = Column(Integer)
- e = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
+ e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True)
Base.metadata.create_all(e)
session = Session(e)
emit SQL on behalf of a particular kind of mapped class in order to locate
the appropriate source of database connectivity::
- engine1 = create_engine('postgresql://db1')
- engine2 = create_engine('postgresql://db2')
+ engine1 = create_engine('postgresql+psycopg2://db1')
+ engine2 = create_engine('postgresql+psycopg2://db2')
Session = sessionmaker()
# an Engine, which the Session will use for connection
# resources
- engine = create_engine('postgresql://scott:tiger@localhost/')
+ engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/')
# create session and add objects
with Session(engine) as session:
# an Engine, which the Session will use for connection
# resources, typically in module scope
- engine = create_engine('postgresql://scott:tiger@localhost/')
+ engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/')
# a sessionmaker(), also in the same scope as the engine
Session = sessionmaker(engine)
# an Engine, which the Session will use for connection
# resources
- engine = create_engine('postgresql://scott:tiger@localhost/')
+ engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/')
# a sessionmaker(), also in the same scope as the engine
Session = sessionmaker(engine)
Engine::
- engine = create_engine("postgresql://user:pass@host/dbname", future=True)
+ engine = create_engine("postgresql+psycopg2://user:pass@host/dbname", future=True)
with engine.connect() as conn:
conn.execute(
Engine::
- engine = create_engine("postgresql://user:pass@host/dbname", future=True)
+ engine = create_engine("postgresql+psycopg2://user:pass@host/dbname", future=True)
with engine.begin() as conn:
conn.execute(
Engine::
- engine = create_engine("postgresql://user:pass@host/dbname", future=True)
+ engine = create_engine("postgresql+psycopg2://user:pass@host/dbname", future=True)
with engine.begin() as conn:
savepoint = conn.begin_nested()
interacting with transactions not managed by SQLAlchemy. To use two phase
transactions set the flag ``twophase=True`` on the session::
- engine1 = create_engine('postgresql://db1')
- engine2 = create_engine('postgresql://db2')
+ engine1 = create_engine('postgresql+psycopg2://db1')
+ engine2 = create_engine('postgresql+psycopg2://db2')
Session = sessionmaker(twophase=True)
from sqlalchemy.orm import sessionmaker
eng = create_engine(
- "postgresql://scott:tiger@localhost/test",
+ "postgresql+psycopg2://scott:tiger@localhost/test",
isolation_level='REPEATABLE READ'
)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
- eng = create_engine("postgresql://scott:tiger@localhost/test")
+ eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
We can for example create our :class:`_orm.Session` from a default
:class:`.sessionmaker` and pass an engine set for autocommit::
- plain_engine = create_engine("postgresql://scott:tiger@localhost/test")
+ plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")
# global application scope. create Session class, engine
Session = sessionmaker()
- engine = create_engine('postgresql://...')
+ engine = create_engine('postgresql+psycopg2://...')
class SomeTest(TestCase):
def setUp(self):
if __name__ == "__main__":
engine = create_engine(
- "postgresql://scott:tiger@localhost/test", echo=True
+ "postgresql+psycopg2://scott:tiger@localhost/test", echo=True
)
meta.create_all(engine)
if __name__ == "__main__":
engine = create_engine(
- "postgresql://scott:tiger@localhost/test", echo=True
+ "postgresql+psycopg2://scott:tiger@localhost/test", echo=True
)
Base.metadata.create_all(engine)
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine(
- "postgresql://scott:tiger@localhost/test", echo=True
+ "postgresql+psycopg2://scott:tiger@localhost/test", echo=True
)
metadata = MetaData(engine)
Base = declarative_base(metadata=metadata)
To set isolation level using :func:`_sa.create_engine`::
engine = create_engine(
- "mysql://scott:tiger@localhost/test",
+ "mysql+mysqldb://scott:tiger@localhost/test",
isolation_level="READ UNCOMMITTED"
)
from sqlalchemy import create_engine, event
- eng = create_engine("mysql://scott:tiger@localhost/test", echo='debug')
+ eng = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo='debug')
# `insert=True` will ensure this is the very first listener to run
@event.listens_for(eng, "connect", insert=True)
from sqlalchemy import create_engine
- e = create_engine("mysql://scott:tiger@localhost/test", echo=True)
+ e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo=True)
m.create_all(e)
output::
the value back at the same time. To disable this feature across the board,
specify ``implicit_returning=False`` to :func:`_sa.create_engine`::
- engine = create_engine("oracle://scott:tiger@dsn",
+ engine = create_engine("oracle+cx_oracle://scott:tiger@dsn",
implicit_returning=False)
Implicit returning can also be disabled on a table-by-table basis as a table
# exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
e = create_engine(
- "oracle://scott:tiger@xe",
+ "oracle+cx_oracle://scott:tiger@xe",
exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
.. versionadded:: 1.1
encoding="UTF-8", nencoding="UTF-8"
)
- engine = create_engine("oracle://", creator=pool.acquire, poolclass=NullPool)
+ engine = create_engine("oracle+cx_oracle://", creator=pool.acquire, poolclass=NullPool)
The above engine may then be used normally where cx_Oracle's pool handles
connection pooling::
def creator():
return pool.acquire(cclass="MYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF)
- engine = create_engine("oracle://", creator=creator, poolclass=NullPool)
+ engine = create_engine("oracle+cx_oracle://", creator=creator, poolclass=NullPool)
The above engine may then be used normally where cx_Oracle handles session
pooling and Oracle Database additionally uses DRCP::
reflection process as follows::
>>> from sqlalchemy import Table, MetaData, create_engine, text
- >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
+ >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
>>> with engine.connect() as conn:
... conn.execute(text("SET search_path TO test_schema, public"))
... meta = MetaData()
using psycopg2, the DBAPI only allows serializers at the per-cursor
or per-connection level. E.g.::
- engine = create_engine("postgresql://scott:tiger@localhost/test",
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test",
json_serializer=my_serialize_fn,
json_deserializer=my_deserialize_fn
)
:class:`_engine.Connection`::
from sqlalchemy import create_engine
- engine = create_engine("postgresql://scott:tiger@localhost/test")
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
connection = engine.connect()
trans = connection.begin()
connection.execute(text("insert into x (a, b) values (1, 2)"))
.. index::
single: thread safety; Transaction
- """
+ """ # noqa
__slots__ = ()
from sqlalchemy import event
from sqlalchemy.engine import Engine
- primary_engine = create_engine("mysql://")
+ primary_engine = create_engine("mysql+mysqldb://")
shard1 = primary_engine.execution_options(shard_id="shard1")
shard2 = primary_engine.execution_options(shard_id="shard2")
first positional argument, usually a string
that indicates database dialect and connection arguments::
- engine = create_engine("postgresql://scott:tiger@localhost/test")
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
.. note::
and its underlying :class:`.Dialect` and :class:`_pool.Pool`
constructs::
- engine = create_engine("mysql://scott:tiger@hostname/dbname",
+ engine = create_engine("mysql+mysqldb://scott:tiger@hostname/dbname",
encoding='latin1', echo=True)
The string form of the URL is
executemany):
log.info("Received statement: %s", statement)
- engine = create_engine('postgresql://scott:tiger@localhost/test')
+ engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/test')
event.listen(engine, "before_cursor_execute", before_cursor_execute)
or with a specific :class:`_engine.Connection`::
and parameters. See those methods for a description of
specific return arguments.
- """
+ """ # noqa
_target_class_doc = "SomeEngine"
_dispatch_target = ConnectionEventsTarget
def dump(sql, *multiparams, **params):
print(sql.compile(dialect=engine.dialect))
- engine = create_mock_engine('postgresql://', dump)
+ engine = create_mock_engine('postgresql+psycopg2://', dump)
metadata.create_all(engine, checkfirst=False)
:param url: A string URL which typically needs to contain only the
E.g.::
>>> from sqlalchemy.engine import make_url
- >>> url = make_url("postgresql://user:pass@host/dbname")
+ >>> url = make_url("postgresql+psycopg2://user:pass@host/dbname")
>>> url = url.update_query_string("alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
>>> str(url)
- 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
+ 'postgresql+psycopg2://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
:param query_string: a URL escaped query string, not including the
question mark.
E.g.::
>>> from sqlalchemy.engine import make_url
- >>> url = make_url("postgresql://user:pass@host/dbname")
+ >>> url = make_url("postgresql+psycopg2://user:pass@host/dbname")
>>> url = url.update_query_pairs([("alt_host", "host1"), ("alt_host", "host2"), ("ssl_cipher", "/path/to/crt")])
>>> str(url)
- 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
+ 'postgresql+psycopg2://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
:param key_value_pairs: A sequence of tuples containing two strings
each.
>>> from sqlalchemy.engine import make_url
- >>> url = make_url("postgresql://user:pass@host/dbname")
+ >>> url = make_url("postgresql+psycopg2://user:pass@host/dbname")
>>> url = url.update_query_dict({"alt_host": ["host1", "host2"], "ssl_cipher": "/path/to/crt"})
>>> str(url)
- 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
+ 'postgresql+psycopg2://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
:param query_parameters: A dictionary with string keys and values
>>> from sqlalchemy.engine import make_url
- >>> url = make_url("postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
+ >>> url = make_url("postgresql+psycopg2://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
>>> url.query
immutabledict({'alt_host': ('host1', 'host2'), 'ssl_cipher': '/path/to/crt'})
>>> url.normalized_query
described at :meth:`.Session.get_bind`. Usage looks like::
Session = sessionmaker(binds={
- SomeMappedClass: create_engine('postgresql://engine1'),
- SomeDeclarativeBase: create_engine('postgresql://engine2'),
- some_mapper: create_engine('postgresql://engine3'),
- some_table: create_engine('postgresql://engine4'),
+ SomeMappedClass: create_engine('postgresql+psycopg2://engine1'),
+ SomeDeclarativeBase: create_engine('postgresql+psycopg2://engine2'),
+ some_mapper: create_engine('postgresql+psycopg2://engine3'),
+ some_table: create_engine('postgresql+psycopg2://engine4'),
})
.. seealso::
:param autocommit: the "autocommit" keyword is present for backwards
compatibility but must remain at its default value of ``False``.
- """
+ """ # noqa
# considering allowing the "autocommit" keyword to still be accepted
# as long as it's False, so that external test suites, oslo.db etc
# an Engine, which the Session will use for connection
# resources
- engine = create_engine('postgresql://scott:tiger@localhost/')
+ engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/')
Session = sessionmaker(engine)
targets, which will be resolved to the ``.pool`` attribute of the
given engine or the :class:`_pool.Pool` class::
- engine = create_engine("postgresql://scott:tiger@localhost/test")
+ engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
# will associate with engine.pool
event.listen(engine, 'checkout', my_on_checkout)
- """
+ """ # noqa
_target_class_doc = "SomeEngineOrPool"
_dispatch_target = Pool
from sqlalchemy.dialects.postgresql import array
engine = create_engine(
- 'postgresql://scott:tiger@localhost/mydatabase'
+ 'postgresql+psycopg2://scott:tiger@localhost/mydatabase'
)
metadata_obj = MetaData()
tbl = Table(
sqlite_file = sqlite:///querytest.db
aiosqlite_file = sqlite+aiosqlite:///async_querytest.db
pysqlcipher_file = sqlite+pysqlcipher://:test@/querytest.db.enc
-postgresql = postgresql://scott:tiger@127.0.0.1:5432/test
+postgresql = postgresql+psycopg2://scott:tiger@127.0.0.1:5432/test
asyncpg = postgresql+asyncpg://scott:tiger@127.0.0.1:5432/test
asyncpg_fallback = postgresql+asyncpg://scott:tiger@127.0.0.1:5432/test?async_fallback=true
pg8000 = postgresql+pg8000://scott:tiger@127.0.0.1:5432/test
postgresql_psycopg2cffi = postgresql+psycopg2cffi://scott:tiger@127.0.0.1:5432/test
-mysql = mysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+mysql = mysql+mysqldb://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
pymysql = mysql+pymysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
aiomysql = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
aiomysql_fallback = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
asyncmy = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
asyncmy_fallback = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
-mariadb = mariadb://scott:tiger@127.0.0.1:3306/test
+mariadb = mariadb+mysqldb://scott:tiger@127.0.0.1:3306/test
mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server
mssql_pymssql = mssql+pymssql://scott:tiger@ms_2008
docker_mssql = mssql+pymssql://scott:tiger^5HHH@127.0.0.1:1433/test
-oracle = oracle://scott:tiger@127.0.0.1:1521
-oracle8 = oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
+oracle = oracle+cx_oracle://scott:tiger@127.0.0.1:1521
+oracle8 = oracle+cx_oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
@testing.combinations(
(
{
- "sqlalchemy.url": "mssql://foodsn",
+ "sqlalchemy.url": "mssql+pyodbc://foodsn",
"sqlalchemy.legacy_schema_aliasing": "true",
},
True,
),
(
{
- "sqlalchemy.url": "mssql://foodsn",
+ "sqlalchemy.url": "mssql+pyodbc://foodsn",
"sqlalchemy.legacy_schema_aliasing": "false",
},
False,
class ParseConnectTest(fixtures.TestBase):
def test_pyodbc_connect_dsn_trusted(self):
dialect = pyodbc.dialect()
- u = url.make_url("mssql://mydsn")
+ u = url.make_url("mssql+pyodbc://mydsn")
connection = dialect.create_connect_args(u)
eq_([["dsn=mydsn;Trusted_Connection=Yes"], {}], connection)
def test_pyodbc_connect_old_style_dsn_trusted(self):
dialect = pyodbc.dialect()
- u = url.make_url("mssql:///?dsn=mydsn")
+ u = url.make_url("mssql+pyodbc:///?dsn=mydsn")
connection = dialect.create_connect_args(u)
eq_([["dsn=mydsn;Trusted_Connection=Yes"], {}], connection)
def test_pyodbc_connect_dsn_non_trusted(self):
dialect = pyodbc.dialect()
- u = url.make_url("mssql://username:password@mydsn")
+ u = url.make_url("mssql+pyodbc://username:password@mydsn")
connection = dialect.create_connect_args(u)
eq_([["dsn=mydsn;UID=username;PWD=password"], {}], connection)
def test_pyodbc_connect_dsn_extra(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql://username:password@mydsn/?LANGUAGE=us_" "english&foo=bar"
+ "mssql+pyodbc://username:password@mydsn/?LANGUAGE=us_"
+ "english&foo=bar"
)
connection = dialect.create_connect_args(u)
dsn_string = connection[0][0]
def test_pyodbc_hostname(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql://username:password@hostspec/database?driver=SQL+Server"
+ "mssql+pyodbc://username:password@hostspec/database?driver=SQL+Server" # noqa
)
connection = dialect.create_connect_args(u)
eq_(
def test_pyodbc_host_no_driver(self):
dialect = pyodbc.dialect()
- u = url.make_url("mssql://username:password@hostspec/database")
+ u = url.make_url("mssql+pyodbc://username:password@hostspec/database")
def go():
return dialect.create_connect_args(u)
def test_pyodbc_connect_comma_port(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql://username:password@hostspec:12345/data"
+ "mssql+pyodbc://username:password@hostspec:12345/data"
"base?driver=SQL Server"
)
connection = dialect.create_connect_args(u)
def test_pyodbc_connect_config_port(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql://username:password@hostspec/database?p"
+ "mssql+pyodbc://username:password@hostspec/database?p"
"ort=12345&driver=SQL+Server"
)
connection = dialect.create_connect_args(u)
def test_pyodbc_extra_connect(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql://username:password@hostspec/database?L"
+ "mssql+pyodbc://username:password@hostspec/database?L"
"ANGUAGE=us_english&foo=bar&driver=SQL+Server"
)
connection = dialect.create_connect_args(u)
def test_pyodbc_odbc_connect(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server"
+ "mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BSQL+Server"
"%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase"
"%3BUID%3Dusername%3BPWD%3Dpassword"
)
def test_pyodbc_odbc_connect_with_dsn(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase"
+ "mssql+pyodbc:///?odbc_connect=dsn%3Dmydsn%3BDatabase"
"%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword"
)
connection = dialect.create_connect_args(u)
def test_pyodbc_odbc_connect_ignores_other_values(self):
dialect = pyodbc.dialect()
u = url.make_url(
- "mssql://userdiff:passdiff@localhost/dbdiff?od"
+ "mssql+pyodbc://userdiff:passdiff@localhost/dbdiff?od"
"bc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer"
"%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Duse"
"rname%3BPWD%3Dpassword"
dialect = mysqldb.dialect()
connect_args = dialect.create_connect_args(
make_url(
- "mysql://scott:tiger@localhost:3306/test"
+ "mysql+mysqldb://scott:tiger@localhost:3306/test"
"?%s=%s" % (kwarg, value)
)
)
def test_random_arg(self):
dialect = testing.db.dialect
kw = dialect.create_connect_args(
- make_url("mysql://u:p@host/db?foo=true")
+ make_url("mysql+mysqldb://u:p@host/db?foo=true")
)[1]
eq_(kw["foo"], "true")
class ParseVersionTest(fixtures.TestBase):
def test_mariadb_madness(self):
- mysql_dialect = make_url("mysql://").get_dialect()()
+ mysql_dialect = make_url("mysql+mysqldb://").get_dialect()()
is_(mysql_dialect.is_mariadb, False)
def test_psycopg2_empty_connection_string(self):
dialect = psycopg2_dialect.dialect()
- u = url.make_url("postgresql://")
+ u = url.make_url("postgresql+psycopg2://")
cargs, cparams = dialect.create_connect_args(u)
eq_(cargs, [""])
eq_(cparams, {})
def test_psycopg2_nonempty_connection_string(self):
dialect = psycopg2_dialect.dialect()
- u = url.make_url("postgresql://host")
+ u = url.make_url("postgresql+psycopg2://host")
cargs, cparams = dialect.create_connect_args(u)
eq_(cargs, [])
eq_(cparams, {"host": "host"})
def test_psycopg2_empty_connection_string_w_query_one(self):
dialect = psycopg2_dialect.dialect()
- u = url.make_url("postgresql:///?service=swh-log")
+ u = url.make_url("postgresql+psycopg2:///?service=swh-log")
cargs, cparams = dialect.create_connect_args(u)
eq_(cargs, [])
eq_(cparams, {"service": "swh-log"})
def test_psycopg2_empty_connection_string_w_query_two(self):
dialect = psycopg2_dialect.dialect()
- u = url.make_url("postgresql:///?any_random_thing=yes")
+ u = url.make_url("postgresql+psycopg2:///?any_random_thing=yes")
cargs, cparams = dialect.create_connect_args(u)
eq_(cargs, [])
eq_(cparams, {"any_random_thing": "yes"})
def test_psycopg2_nonempty_connection_string_w_query(self):
dialect = psycopg2_dialect.dialect()
- u = url.make_url("postgresql://somehost/?any_random_thing=yes")
+ u = url.make_url(
+ "postgresql+psycopg2://somehost/?any_random_thing=yes"
+ )
cargs, cparams = dialect.create_connect_args(u)
eq_(cargs, [])
eq_(cparams, {"host": "somehost", "any_random_thing": "yes"})
def test_psycopg2_nonempty_connection_string_w_query_two(self):
dialect = psycopg2_dialect.dialect()
- url_string = "postgresql://USER:PASS@/DB?host=hostA"
+ url_string = "postgresql+psycopg2://USER:PASS@/DB?host=hostA"
u = url.make_url(url_string)
cargs, cparams = dialect.create_connect_args(u)
eq_(cargs, [])
def test_psycopg2_nonempty_connection_string_w_query_three(self):
dialect = psycopg2_dialect.dialect()
url_string = (
- "postgresql://USER:PASS@/DB"
+ "postgresql+psycopg2://USER:PASS@/DB"
"?host=hostA:portA&host=hostB&host=hostC"
)
u = url.make_url(url_string)
"only argument accepted is 'mock'"
):
e = create_engine(
- "postgresql://", strategy="mock", executor=executor
+ "postgresql+psycopg2://", strategy="mock", executor=executor
)
assert isinstance(e, MockConnection)
tsa.exc.ArgumentError,
"unknown strategy: 'threadlocal'",
create_engine,
- "postgresql://",
+ "postgresql+psycopg2://",
strategy="threadlocal",
)
"and no longer has any effect."
):
create_engine(
- "postgresql://",
+ "postgresql+psycopg2://",
empty_in_strategy="static",
module=Mock(),
_initialize=False,
def dump(sql, *multiparams, **params):
buf.write(util.text_type(sql.compile(dialect=engine.dialect)))
- engine = create_mock_engine("postgresql://", executor=dump)
+ engine = create_mock_engine("postgresql+psycopg2://", executor=dump)
return engine, buf
def test_sequence_not_duped(self):
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing import ne_
from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.mock import call
def test_connect_query(self):
dbapi = MockDBAPI(foober="12", lala="18", fooz="somevalue")
e = create_engine(
- "postgresql://scott:tiger@somehost/test?foobe"
+ "postgresql+psycopg2://scott:tiger@somehost/test?foobe"
"r=12&lala=18&fooz=somevalue",
module=dbapi,
_initialize=False,
foober=12, lala=18, hoho={"this": "dict"}, fooz="somevalue"
)
e = create_engine(
- "postgresql://scott:tiger@somehost/test?fooz=" "somevalue",
+ "postgresql+psycopg2://scott:tiger@somehost/test?fooz="
+ "somevalue",
connect_args={"foober": 12, "lala": 18, "hoho": {"this": "dict"}},
module=dbapi,
_initialize=False,
dbapi = mock_dbapi
config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test"
+ "sqlalchemy.url": "postgresql+psycopg2://scott:tiger@somehost/test"
"?fooz=somevalue",
"sqlalchemy.pool_recycle": "50",
"sqlalchemy.echo": "true",
e = engine_from_config(config, module=dbapi, _initialize=False)
assert e.pool._recycle == 50
assert e.url == url.make_url(
- "postgresql://scott:tiger@somehost/test?foo" "z=somevalue"
+ "postgresql+psycopg2://scott:tiger@somehost/test?foo" "z=somevalue"
)
assert e.echo is True
dbapi = mock_dbapi
config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test"
+ "sqlalchemy.url": "postgresql+psycopg2://scott:tiger@somehost/test"
"?fooz=somevalue",
"sqlalchemy.future": "true",
}
dbapi = mock_dbapi
config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test"
+ "sqlalchemy.url": "postgresql+psycopg2://scott:tiger@somehost/test"
"?fooz=somevalue",
"sqlalchemy.future": "false",
}
("none", pool.reset_none),
]:
config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test",
+ "sqlalchemy.url": "postgresql+psycopg2://scott:tiger@somehost/test", # noqa
"sqlalchemy.pool_reset_on_return": value,
}
# module instead of psycopg
e = create_engine(
- "postgresql://", creator=connect, module=dbapi, _initialize=False
+ "postgresql+psycopg2://",
+ creator=connect,
+ module=dbapi,
+ _initialize=False,
)
e.connect()
foober=12, lala=18, hoho={"this": "dict"}, fooz="somevalue"
)
e = create_engine(
- "postgresql://", pool_recycle=472, module=dbapi, _initialize=False
+ "postgresql+psycopg2://",
+ pool_recycle=472,
+ module=dbapi,
+ _initialize=False,
)
assert e.pool._recycle == 472
(False, pool.reset_none),
]:
e = create_engine(
- "postgresql://",
+ "postgresql+psycopg2://",
pool_reset_on_return=value,
module=dbapi,
_initialize=False,
assert_raises(
exc.ArgumentError,
create_engine,
- "postgresql://",
+ "postgresql+psycopg2://",
pool_reset_on_return="hi",
module=dbapi,
_initialize=False,
assert_raises(
TypeError,
create_engine,
- "postgresql://",
+ "postgresql+psycopg2://",
use_ansi=True,
module=mock_dbapi,
)
assert_raises(
TypeError,
create_engine,
- "postgresql://",
+ "postgresql+psycopg2://",
lala=5,
module=mock_dbapi,
)
"""test the url attribute on ``Engine``."""
e = create_engine(
- "mysql://scott:tiger@localhost/test",
+ "mysql+mysqldb://scott:tiger@localhost/test",
module=mock_dbapi,
_initialize=False,
)
- u = url.make_url("mysql://scott:tiger@localhost/test")
+ u = url.make_url("mysql+mysqldb://scott:tiger@localhost/test")
e2 = create_engine(u, module=mock_dbapi, _initialize=False)
- assert e.url.drivername == e2.url.drivername == "mysql"
+ assert e.url.drivername == e2.url.drivername == "mysql+mysqldb"
assert e.url.username == e2.url.username == "scott"
assert e2.url is u
- assert str(u) == "mysql://scott:tiger@localhost/test"
- assert repr(u) == "mysql://scott:***@localhost/test"
- assert repr(e) == "Engine(mysql://scott:***@localhost/test)"
- assert repr(e2) == "Engine(mysql://scott:***@localhost/test)"
+ assert str(u) == "mysql+mysqldb://scott:tiger@localhost/test"
+ assert repr(u) == "mysql+mysqldb://scott:***@localhost/test"
+ assert repr(e) == "Engine(mysql+mysqldb://scott:***@localhost/test)"
+ assert repr(e2) == "Engine(mysql+mysqldb://scott:***@localhost/test)"
def test_poolargs(self):
"""test that connection pool args make it thru"""
e = create_engine(
- "postgresql://",
+ "postgresql+psycopg2://",
creator=None,
pool_recycle=50,
echo_pool=None,
# these args work for QueuePool
e = create_engine(
- "postgresql://",
+ "postgresql+psycopg2://",
max_overflow=8,
pool_timeout=60,
poolclass=tsa.pool.QueuePool,
e.connect()
eq_(sp.called, 1)
+ def test_default_driver(self):
+ successes = 0
+ for url_prefix, driver_name in [
+ ("mariadb://", "mysqldb"),
+ ("mssql://", "pyodbc"),
+ ("mysql://", "mysqldb"),
+ ("oracle://", "cx_oracle"),
+ ("postgresql://", "psycopg2"),
+ ("sqlite://", "pysqlite"),
+ ]:
+ try:
+ en = create_engine(url_prefix)
+ eq_(en.dialect.driver, driver_name)
+ successes += 1
+ except ModuleNotFoundError:
+ # not all test environments will have every driver installed
+ pass
+ # but we should at least find one
+ ne_(successes, 0, "No default drivers found.")
+
class TestRegNewDBAPI(fixtures.TestBase):
def test_register_base(self):
def _pool_fixture(self, pre_ping, pool_kw=None):
dialect = url.make_url(
- "postgresql://foo:bar@localhost/test"
+ "postgresql+psycopg2://foo:bar@localhost/test"
).get_dialect()()
dialect.dbapi = self.dbapi
_pool = pool.QueuePool(
self.dbapi = MockDBAPI()
self.db = testing_engine(
- "postgresql://foo:bar@localhost/test",
+ "postgresql+psycopg2://foo:bar@localhost/test",
options=dict(module=self.dbapi, _initialize=False),
)
"The create_engine.convert_unicode parameter and "
"corresponding dialect-level"
):
- create_engine("mysql://", convert_unicode=True, module=mock.Mock())
+ create_engine(
+ "mysql+mysqldb://", convert_unicode=True, module=mock.Mock()
+ )
def test_empty_and_or(self):
with testing.expect_deprecated(