dbapi = classmethod(dbapi)
def create_connect_args(self, url):
- opts = url.translate_connect_args(['host', 'database', 'username', 'password', 'port'])
+ opts = url.translate_connect_args()
connectors = ["Driver={Microsoft Access Driver (*.mdb)}"]
connectors.append("Dbq=%s" % opts["database"])
- user = opts.get("user")
+ user = opts.get("username", None)
if user:
connectors.append("UID=%s" % user)
connectors.append("PWD=%s" % opts.get("password", ""))
- return [[";".join (connectors)], {}]
+ return [[";".join(connectors)], {}]
def create_execution_context(self, *args, **kwargs):
return AccessExecutionContext(self, *args, **kwargs)
# A fresh DAO connection is opened for each reflection
# This is necessary, so we get the latest updates
- opts = connection.engine.url.translate_connect_args(['host', 'database', 'username', 'password', 'port'])
- dtbs = daoEngine.OpenDatabase(opts['database'])
+ dtbs = daoEngine.OpenDatabase(connection.engine.url.database)
try:
for tbl in dtbs.TableDefs:
def table_names(self, connection, schema):
# A fresh DAO connection is opened for each reflection
# This is necessary, so we get the latest updates
- opts = connection.engine.url.translate_connect_args(['host', 'database', 'username', 'password', 'port'])
- dtbs = daoEngine.OpenDatabase(opts['database'])
+ dtbs = daoEngine.OpenDatabase(connection.engine.url.database)
names = [t.Name for t in dtbs.TableDefs if t.Name[:4] != "MSys" and t.Name[:4] <> "~TMP"]
dtbs.Close()
return dialect
raise ImportError('unknown database %r' % self.drivername)
- def translate_connect_args(self, names):
- """Translate attributes into a dictionary of connection arguments.
-
- Given a list of argument names corresponding to the URL
- attributes (`host`, `database`, `username`, `password`,
- `port`), will assemble the attribute values of this URL into
- the dictionary using the given names.
+ def translate_connect_args(self, names=[], **kw):
+ """Translate url attributes into a dictionary of connection arguments.
+
+ Returns attributes of this url (`host`, `database`, `username`,
+ `password`, `port`) as a plain dictionary. The attribute names are
+ used as the keys by default. Unset or false attributes are omitted
+ from the final dictionary.
+
+ \**kw
+ Optional, alternate key names for url attributes::
+
+ # return 'username' as 'user'
+ username='user'
+
+ # omit 'database'
+ database=None
+
+ names
+ Deprecated. A list of key names. Equivalent to the keyword
+ usage, must be provided in the order above.
"""
- a = {}
+ translated = {}
attribute_names = ['host', 'database', 'username', 'password', 'port']
- for n in names:
- sname = attribute_names.pop(0)
- if n is None:
- continue
- if getattr(self, sname, None):
- a[n] = getattr(self, sname)
- return a
+ for sname in attribute_names:
+ if names:
+ name = names.pop(0)
+ elif sname in kw:
+ name = kw[sname]
+ else:
+ name = sname
+ if name is not None and getattr(self, sname, False):
+ translated[name] = getattr(self, sname)
+ return translated
def make_url(name_or_url):
"""Given a string or unicode instance, produce a new URL instance.
def _parse_rfc1738_args(name):
pattern = re.compile(r'''
- (\w+)://
+ (?P<name>\w+)://
(?:
- ([^:/]*)
- (?::([^/]*))?
+ (?P<username>[^:/]*)
+ (?::(?P<password>[^/]*))?
@)?
(?:
- ([^/:]*)
- (?::([^/]*))?
+ (?P<host>[^/:]*)
+ (?::(?P<port>[^/]*))?
)?
- (?:/(.*))?
+ (?:/(?P<database>.*))?
'''
, re.X)
m = pattern.match(name)
if m is not None:
- (name, username, password, host, port, database) = m.group(1, 2, 3, 4, 5, 6)
- if database is not None:
- tokens = database.split(r"?", 2)
- database = tokens[0]
- query = (len(tokens) > 1 and dict( cgi.parse_qsl(tokens[1]) ) or None)
+ components = m.groupdict()
+ if components['database'] is not None:
+ tokens = components['database'].split('?', 2)
+ components['database'] = tokens[0]
+ query = (len(tokens) > 1 and dict(cgi.parse_qsl(tokens[1]))) or None
if query is not None:
query = dict([(k.encode('ascii'), query[k]) for k in query])
else:
query = None
- opts = {'username':username,'password':password,'host':host,'port':port,'database':database, 'query':query}
- if opts['password'] is not None:
- opts['password'] = urllib.unquote_plus(opts['password'])
- return URL(name, **opts)
+ components['query'] = query
+
+ if components['password'] is not None:
+ components['password'] = urllib.unquote_plus(components['password'])
+
+ name = components.pop('name')
+ return URL(name, **components)
else:
- raise exceptions.ArgumentError("Could not parse rfc1738 URL from string '%s'" % name)
+ raise exceptions.ArgumentError(
+ "Could not parse rfc1738 URL from string '%s'" % name)
def _parse_keyvalue_args(name):
m = re.match( r'(\w+)://(.*)', name)