# the MIT License: http://www.opensource.org/licenses/mit-license.php
-__all__ = ['oracle', 'postgres', 'sqlite', 'mysql']
\ No newline at end of file
+__all__ = ['oracle', 'postgres', 'sqlite', 'mysql', 'mssql']
Column("constraint_name", String),
schema="information_schema")
+gen_ref_constraints = schema.Table("referential_constraints", generic_engine,
+ Column("constraint_catalog", String),
+ Column("constraint_schema", String),
+ Column("constraint_name", String),
+ Column("unique_constraint_catlog", String),
+ Column("unique_constraint_schema", String),
+ Column("unique_constraint_name", String),
+ Column("match_option", String),
+ Column("update_rule", String),
+ Column("delete_rule", String),
+ schema="information_schema")
+
class ISchema(object):
def __init__(self, engine):
self.engine = engine
connection = property(_connection, doc="the connection represented by this SQLSession. The connection is late-connecting, meaning the call to the connection pool only occurs when it is first called (and the pool will typically only connect the first time it is called as well)")
def begin(self):
- """begins" a transaction on this SQLSession's connection. repeated calls to begin() will increment a counter that must be decreased by corresponding commit() statements before an actual commit occurs. this is to provide "nested" behavior of transactions so that different functions in a particular call stack can call begin()/commit() independently of each other without knowledge of an existing transaction."""
+ """begins a transaction on this SQLSession's connection. repeated calls to begin() will increment a counter that must be decreased by corresponding commit() statements before an actual commit occurs. this is to provide "nested" behavior of transactions so that different functions in a particular call stack can call begin()/commit() independently of each other without knowledge of an existing transaction. """
if self.__tcount == 0:
self.__transaction = self.connection
self.engine.do_begin(self.connection)
self.commit()
def begin(self):
- """"begins a transaction on the current thread's SQLSession."""
+ """ begins a transaction on the current thread SQLSession. """
self.session.begin()
def rollback(self):
return ResultProxy(cursor, self, typemap=compiled.typemap)
def execute(self, statement, parameters=None, connection=None, cursor=None, echo=None, typemap=None, commit=False, return_raw=False, **kwargs):
- """executes the given string-based SQL statement with the given parameters.
+ """ executes the given string-based SQL statement with the given parameters.
The parameters can be a dictionary or a list, or a list of dictionaries or lists, depending
on the paramstyle of the DBAPI.
up.
In all error cases, a rollback() is immediately performed on the connection before
- propigating the exception outwards.
+ propagating the exception outwards.
Other options include:
default=None : a scalar, python callable, or ClauseElement representing the "default value" for this column,
which will be invoked upon insert if this column is not present in the insert list or is given a value
of None.
-
+
hidden=False : indicates this column should not be listed in the
table's list of columns. Used for the "oid" column, which generally
isnt in column lists.
indexed in a unique index . Pass true to autogenerate the index
name. Pass a string to specify the index name. Multiple columns that
specify the same index name will all be included in the index, in the
- order of their creation. """
+ order of their creation.
+
+ """
name = str(name) # in case of incoming unicode
super(Column, self).__init__(name, None, type)
"""calls the visit_seauence method on the given visitor."""
return visitor.visit_sequence(self)
+
class Index(SchemaItem):
"""Represents an index of columns from a database table
"""
"""test that mixed-case index identifiers are legal"""
employees = Table('companyEmployees', testbase.db,
Column('id', Integer, primary_key=True),
- Column('firstName', String),
- Column('lastName', String),
- Column('emailAddress', String))
+ Column('firstName', String(30)),
+ Column('lastName', String(30)),
+ Column('emailAddress', String(30)))
+
employees.create()
self.created.append(employees)
import sqlalchemy.databases.sqlite as sqlite
db = ansisql.engine()
+#db = create_engine('mssql')
from testbase import PersistTest
import unittest, re
class SchemaTest(SQLTest):
def testselect(self):
+ # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
self.runtest(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
self.runtest(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value")
elif DBTYPE == 'oracle8':
db_uri = 'oracle://user=scott&password=tiger'
opts = {'use_ansi':False}
+ elif DBTYPE == 'mssql':
+ db_uri = 'mssql://database=test&user=scott&password=tiger'
if not db_uri:
- raise "Could not create engine. specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
+ raise "Could not create engine. specify --db <sqlite|sqlite_file|postgres|mysql|oracle|oracle8|mssql> to test runner."
if PROXY:
db = proxy.ProxyEngine(echo=echo, default_ordering=True, **opts)
def testbinary(self):
stream1 =self.get_module_stream('sqlalchemy.sql')
stream2 =self.get_module_stream('sqlalchemy.engine')
- binary_table.insert().execute(misc='sql.pyc', data=stream1, data_slice=stream1[0:100])
- binary_table.insert().execute(misc='engine.pyc', data=stream2, data_slice=stream2[0:99])
+ binary_table.insert().execute(primary_id=1, misc='sql.pyc', data=stream1, data_slice=stream1[0:100])
+ binary_table.insert().execute(primary_id=2, misc='engine.pyc', data=stream2, data_slice=stream2[0:99])
l = binary_table.select().execute().fetchall()
print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
self.assert_(list(stream1) == list(l[0]['data']))
collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
Column('user_date', Date), Column('user_time', Time)]
- if db.engine.__module__.endswith('mysql'):
+ if db.engine.__module__.endswith('mysql') or db.engine.__module__.endswith('mssql'):
# strip microseconds -- not supported by this engine (should be an easier way to detect this)
for d in insert_data:
if d[2] is not None:
users_with_date = Table('query_users_with_date', db, redefine = True, *collist)
users_with_date.create()
insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
+
for idict in insert_dicts:
users_with_date.insert().execute(**idict) # insert the data