else:
def check_quote(token: str) -> str:
- if ";" in str(token) or str(token).startswith("{"):
+ if (
+ ";" in str(token)
+ or "}" in str(token)
+ or str(token).startswith("{")
+ ):
token = "{%s}" % token.replace("}", "}}")
return token
+ driver = keys.pop("driver", self.pyodbc_driver_name)
+
keys = {k: check_quote(v) for k, v in keys.items()}
dsn_connection = "dsn" in keys or (
port = ",%d" % int(keys.pop("port"))
connectors = []
- driver = keys.pop("driver", self.pyodbc_driver_name)
if driver is None and keys:
# note if keys is empty, this is a totally blank URL
util.warn(
"DSN-less connections"
)
else:
- connectors.append("DRIVER={%s}" % driver)
+ connectors.append(
+ "DRIVER={%s}" % str(driver).replace("}", "}}")
+ )
connectors.extend(
[
"AutoTranslate=%s" % keys.pop("odbc_autotranslate")
)
- connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()])
+ connectors.extend(
+ ["%s=%s" % (check_quote(k), v) for k, v in keys.items()]
+ )
return ((";".join(connectors),), connect_args)
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.dialects.mssql import base
+from sqlalchemy.dialects.mssql import mssqlpython
from sqlalchemy.dialects.mssql import pymssql
from sqlalchemy.dialects.mssql import pyodbc
from sqlalchemy.engine import url
connection,
)
+ @testing.combinations(
+ ("pyodbc", pyodbc, "mssql+pyodbc"),
+ ("mssqlpython", mssqlpython, "mssql+mssqlpython"),
+ argnames="dialect_mod, url_prefix",
+ id_="iaa",
+ )
@testing.combinations(
(
"original",
"somehost%3BPORT%3D50001",
"somedb%3BPORT%3D50001",
),
- (
- "DRIVER={foob};Server=somehost%3BPORT%3D50001;"
- "Database={somedb;PORT=50001};UID={someuser;PORT=50001};"
- "PWD={some{strange}}pw;PORT=50001}",
- ),
+ "driver=foob",
+ {
+ "pyodbc": (
+ "DRIVER={foob};Server=somehost%3BPORT%3D50001;"
+ "Database={somedb;PORT=50001};"
+ "UID={someuser;PORT=50001};"
+ "PWD={some{strange}}pw;PORT=50001}",
+ ),
+ "mssqlpython": (
+ "Server=somehost%3BPORT%3D50001;"
+ "Database={somedb;PORT=50001};"
+ "UID={someuser;PORT=50001};"
+ "PWD={some{strange}}pw;PORT=50001};"
+ "driver=foob",
+ ),
+ },
),
(
"issue_8062",
"localhost",
"mydb",
),
+ "driver=foob",
+ {
+ "pyodbc": (
+ "DRIVER={foob};Server=localhost;"
+ "Database=mydb;UID=larry;"
+ "PWD={{moe}",
+ ),
+ "mssqlpython": (
+ "Server=localhost;"
+ "Database=mydb;UID=larry;"
+ "PWD={{moe};"
+ "driver=foob",
+ ),
+ },
+ ),
+ (
+ # a driver name that starts with { and contains } looks like
+ # a complete brace-quoted token. per the MS-ODBCSTR spec,
+ # only the right brace is escaped (doubled) within a
+ # brace-enclosed expression; the left brace is passed
+ # through as a literal character.
+ "driver_brace_open_injection",
(
- "DRIVER={foob};Server=localhost;"
- "Database=mydb;UID=larry;"
- "PWD={{moe}",
+ "username",
+ "password",
+ "hostspec",
+ "database",
),
+ "driver=%7BUID%3Devil%7D",
+ {
+ "pyodbc": (
+ "DRIVER={{UID=evil}}};"
+ "Server=hostspec;Database=database;"
+ "UID=username;PWD=password",
+ ),
+ "mssqlpython": (
+ "Server=hostspec;Database=database;"
+ "UID=username;PWD=password;"
+ "driver={{UID=evil}}}",
+ ),
+ },
),
- argnames="tokens, connection_string",
- id_="iaa",
+ (
+ # the driver name is always wrapped in braces in pyodbc, so a
+ # closing brace in the value has to be escaped, otherwise it
+ # ends the brace early and the remainder is read as further
+ # attributes
+ "driver_brace_injection",
+ (
+ "username",
+ "password",
+ "hostspec",
+ "database",
+ ),
+ "driver=foob%7DUID%3Devil",
+ {
+ "pyodbc": (
+ "DRIVER={foob}}UID=evil};"
+ "Server=hostspec;Database=database;"
+ "UID=username;PWD=password",
+ ),
+ "mssqlpython": (
+ "Server=hostspec;Database=database;"
+ "UID=username;PWD=password;"
+ "driver={foob}}UID=evil}",
+ ),
+ },
+ ),
+ (
+ # names of pass-through parameters get the same brace quoting
+ # as their values, so a semicolon in a parameter name can't be
+ # used to smuggle in an extra attribute
+ "pass_through_key_injection",
+ (
+ "username",
+ "password",
+ "hostspec",
+ "database",
+ ),
+ "driver=foob&foo%3BUID=evil",
+ {
+ "pyodbc": (
+ "DRIVER={foob};Server=hostspec;Database=database;"
+ "UID=username;PWD=password;{foo;UID}=evil",
+ ),
+ "mssqlpython": (
+ "Server=hostspec;Database=database;"
+ "UID=username;PWD=password;"
+ "driver=foob;{foo;UID}=evil",
+ ),
+ },
+ ),
+ argnames="tokens, query, expected",
+ id_="iaaa",
)
- def test_pyodbc_token_injection(self, tokens, connection_string):
- u = url.make_url("mssql+pyodbc://%s:%s@%s/%s?driver=foob" % tokens)
- dialect = pyodbc.dialect()
+ def test_token_injection(
+ self, dialect_mod, url_prefix, tokens, query, expected
+ ):
+ dialect_key = dialect_mod.__name__.rsplit(".", 1)[-1]
+ u = url.make_url(
+ "%s://%s:%s@%s/%s?%s" % ((url_prefix,) + tokens + (query,))
+ )
+ dialect = dialect_mod.dialect()
connection = dialect.create_connect_args(u)
eq_(
(
- connection_string,
+ expected[dialect_key],
{},
),
connection,