# query string.
ssl = {}
- keys = ["ssl_ca", "ssl_key", "ssl_cert", "ssl_capath", "ssl_cipher"]
- for key in keys:
+ keys = [("ssl_ca", str), ("ssl_key", str), ("ssl_cert", str),
+ ("ssl_capath", str), ("ssl_cipher", str),
+ ("ssl_check_hostname", bool)]
+ for key, kw_type in keys:
if key in opts:
ssl[key[4:]] = opts[key]
- util.coerce_kw_type(ssl, key[4:], str)
+ util.coerce_kw_type(ssl, key[4:], kw_type)
del opts[key]
if ssl:
opts["ssl"] = ssl
from sqlalchemy.dialects.mysql import mysqldb
dialect = mysqldb.dialect()
- self._test_ssl_arguments(dialect)
+ self._test_ssl_arguments(dialect, sql_type=mysqldb)
def test_ssl_arguments_oursql(self):
from sqlalchemy.dialects.mysql import oursql
dialect = oursql.dialect()
self._test_ssl_arguments(dialect)
- def _test_ssl_arguments(self, dialect):
- kwarg = dialect.create_connect_args(
- make_url(
- "mysql://scott:tiger@localhost:3306/test"
- "?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem"
- )
- )[1]
+ def _test_ssl_arguments(self, dialect, sql_type="oursql"):
+ url = (
+ "mysql://scott:tiger@localhost:3306/test"
+ "?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem"
+ )
+ expected = {
+ "passwd": "tiger",
+ "db": "test",
+ "ssl": {
+ "ca": "/ca.pem",
+ "cert": "/cert.pem",
+ "key": "/key.pem"
+ },
+ "host": "localhost",
+ "user": "scott",
+ "port": 3306
+ }
+ # add check_hostname check for mysqldb
+ if sql_type == "mysqldb":
+ url = url + "&ssl_check_hostname=false"
+ expected['ssl']['check_hostname'] = False
+
+ kwarg = dialect.create_connect_args(make_url(url))[1]
# args that differ among mysqldb and oursql
for k in ("use_unicode", "found_rows", "client_flag"):
kwarg.pop(k, None)
- eq_(
- kwarg,
- {
- "passwd": "tiger",
- "db": "test",
- "ssl": {
- "ca": "/ca.pem",
- "cert": "/cert.pem",
- "key": "/key.pem",
- },
- "host": "localhost",
- "user": "scott",
- "port": 3306,
- },
- )
+ eq_(kwarg, expected)
@testing.combinations(
("compress", True),