# supports_sane_rowcount.
opts.setdefault('found_rows', True)
+ ssl = {}
+ for key in ['ssl_ca', 'ssl_key', 'ssl_cert',
+ 'ssl_capath', 'ssl_cipher']:
+ if key in opts:
+ ssl[key[4:]] = opts[key]
+ util.coerce_kw_type(ssl, key[4:], str)
+ del opts[key]
+ if ssl:
+ opts['ssl'] = ssl
+
return [[], opts]
def _get_server_version_info(self, connection):
from sqlalchemy import *
from sqlalchemy import sql, exc, schema, types as sqltypes, event
from sqlalchemy.dialects.mysql import base as mysql
+from sqlalchemy.engine.url import make_url
+
from test.lib.testing import eq_
from test.lib import *
from test.lib.engines import utf8_engine
import datetime
+
+class DialectTest(TestBase):
+ __only_on__ = 'mysql'
+
+ @testing.only_on(['mysql+mysqldb', 'mysql+oursql'],
+ 'requires particular SSL arguments')
+ def test_ssl_arguments(self):
+ dialect = testing.db.dialect
+ kwarg = dialect.create_connect_args(
+ make_url("mysql://scott:tiger@localhost:3306/test"
+ "?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem")
+ )[1]
+ # args that differ among mysqldb and oursql
+ for k in ('use_unicode', 'found_rows', 'client_flag'):
+ kwarg.pop(k, None)
+ eq_(
+ kwarg,
+ {
+ 'passwd': 'tiger', 'db': 'test',
+ 'ssl': {'ca': '/ca.pem', 'cert': '/cert.pem',
+ 'key': '/key.pem'},
+ 'host': 'localhost', 'user': 'scott',
+ 'port': 3306
+ }
+ )
+
class TypesTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
"Test MySQL column types"