from engine import *
from types import *
from schema import *
+from exceptions import *
from sql import *
import mapping as mapperlib
from mapping import *
def visit_label(self, label):
if len(self.select_stack):
self.typemap.setdefault(label.name.lower(), label.obj.type)
- if label.obj.type is None:
- raise "nonetype" + repr(label.obj)
self.strings[label] = self.strings[label.obj] + " AS " + label.name
def visit_column(self, column):
return self.obj.__dict__[self.key]
def setattr(self, value):
if isinstance(value, list):
- raise ("assigning a list to scalar property '%s' on '%s' instance %d" % (self.key, self.obj.__class__.__name__, id(self.obj)))
+ raise InvalidRequestError("assigning a list to scalar property '%s' on '%s' instance %d" % (self.key, self.obj.__class__.__name__, id(self.obj)))
orig = self.obj.__dict__.get(self.key, None)
if orig is value:
return
corresponding to a particular vendor, such as mysql-specific, sqlite-specific, etc.
"""
-import sqlalchemy.schema as schema
import sqlalchemy.pool
-import sqlalchemy.util as util
-import sqlalchemy.sql as sql
-import StringIO, sys, re
+import schema
+import exceptions
+import util
+import sql
import sqlalchemy.types as types
-import sqlalchemy.databases
+import StringIO, sys, re
__all__ = ['create_engine', 'engine_descriptors']
self.bindtemplate = "%%(%s)s"
self.positional = True
else:
- raise "Unsupported paramstyle '%s'" % self._paramstyle
+ raise DBAPIError("Unsupported paramstyle '%s'" % self._paramstyle)
def type_descriptor(self, typeobj):
"""provides a database-specific TypeEngine object, given the generic object
return ResultProxy(cursor, self, typemap=typemap)
def _execute(self, c, statement, parameters):
- #try:
- c.execute(statement, parameters)
- #except:
- # raise "OK ERROR " + statement + " " + repr(parameters)
+ try:
+ c.execute(statement, parameters)
+ except Exception, e:
+ raise exceptions.SQLError(statement, parameters, e)
self.context.rowcount = c.rowcount
def _executemany(self, c, statement, parameters):
c.executemany(statement, parameters)
def __init__(self, key):
self.key = key
def convert_result_value(self, arg, engine):
- raise "Ambiguous column name '%s' in result set! try 'use_labels' option on select statement." % (self.key)
+ raise InvalidRequestError("Ambiguous column name '%s' in result set! try 'use_labels' option on select statement." % (self.key))
def __init__(self, cursor, engine, typemap = None):
"""ResultProxy objects are constructed via the execute() method on SQLEngine."""
else:
rec = (types.NULLTYPE, i)
if rec[0] is None:
- raise "None for metadata " + colname
+ raise DBAPIError("None for metadata " + colname)
if self.props.setdefault(colname, rec) is not rec:
self.props[colname] = (ResultProxy.AmbiguousColumn(colname), 0)
self.keys.append(colname)
--- /dev/null
+# exceptions.py - exceptions for SQLAlchemy
+# Copyright (C) 2005,2006 Michael Bayer mike_mp@zzzcomputing.com
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+
+class SQLAlchemyError(Exception):
+ """generic error class"""
+ pass
+
+class SQLError(SQLAlchemyError):
+ """raised when the execution of a SQL statement fails. includes accessors
+ for the underlying exception, as well as the SQL and bind parameters"""
+ def __init__(self, statement, params, orig):
+ SQLAlchemyError.__init__(self, "(%s) %s"% (orig.__class__.__name__, str(orig)))
+ self.statement = statement
+ self.params = params
+ self.orig = orig
+
+class ArgumentError(SQLAlchemyError):
+ """raised for all those conditions where invalid arguments are sent to constructed
+ objects. This error generally corresponds to construction time state errors."""
+ pass
+
+class CommitError(SQLAlchemyError):
+ """raised when an invalid condition is detected upon a commit()"""
+ pass
+
+class InvalidRequestError(SQLAlchemyError):
+ """sqlalchemy was asked to do something it cant do, return nonexistent data, etc.
+ This error generally corresponds to runtime state errors."""
+ pass
+
+class AssertionError(SQLAlchemyError):
+ """corresponds to internal state being detected in an invalid state"""
+ pass
+
+class DBAPIError(SQLAlchemyError):
+ """something weird happened with a particular DBAPI version"""
+ pass
\ No newline at end of file
import sqlalchemy.engine as engine
import sqlalchemy.util as util
import objectstore
+from exceptions import *
import types as types
from mapper import *
from properties import *
"""provides a relationship of a primary Mapper to a secondary Mapper, which corresponds
to a parent-child or associative table relationship."""
if len(args) > 1 and isinstance(args[0], type):
- raise ValueError("relation(class, table, **kwargs) is deprecated. Please use relation(class, **kwargs) or relation(mapper, **kwargs).")
+ raise ArgumentError("relation(class, table, **kwargs) is deprecated. Please use relation(class, **kwargs) or relation(mapper, **kwargs).")
return _relation_loader(*args, **kwargs)
def _relation_loader(mapper, secondary=None, primaryjoin=None, secondaryjoin=None, lazy=True, **kwargs):
pass
except AttributeError:
pass
- raise "Class '%s' has no mapper associated with it" % class_.__name__
+ raise InvalidRequestError("Class '%s' has no mapper associated with it" % class_.__name__)
def assign_mapper(class_, *args, **params):
import sqlalchemy.schema as schema
import sqlalchemy.engine as engine
import sqlalchemy.util as util
+from sqlalchemy.exceptions import *
import objectstore
import sys
import weakref
self._options = {}
if not issubclass(class_, object):
- raise TypeError("Class '%s' is not a new-style class" % class_.__name__)
+ raise ArgumentError("Class '%s' is not a new-style class" % class_.__name__)
if isinstance(table, sql.Select):
# some db's, noteably postgres, dont want to select from a select
# the configured properties on the mapper are not matched against the alias
# we make, theres workarounds but it starts to get really crazy (its crazy enough
# the SQL that gets generated) so just require an alias
- raise TypeError("Mapping against a Select object requires that it has a name. Use an alias to give it a name, i.e. s = select(...).alias('myselect')")
+ raise ArgumentError("Mapping against a Select object requires that it has a name. Use an alias to give it a name, i.e. s = select(...).alias('myselect')")
else:
self.table = table
except KeyError:
l = self.pks_by_table.setdefault(t, util.HashSet(ordered=True))
if not len(t.primary_key):
- raise ValueError("Table " + t.name + " has no primary key columns. Specify primary_key argument to mapper.")
+ raise ArgumentError("Table " + t.name + " has no primary key columns. Specify primary_key argument to mapper.")
for k in t.primary_key:
l.append(k)
try:
prop = self.table._get_col_by_original(prop)
except KeyError:
- raise ValueError("Column '%s' is not represented in mapper's table" % prop._label)
+ raise ArgumentError("Column '%s' is not represented in mapper's table" % prop._label)
self.columns[key] = prop
prop = ColumnProperty(prop)
elif isinstance(prop, list) and sql.is_column(prop[0]):
try:
prop = [self.table._get_col_by_original(p) for p in prop]
except KeyError, e:
- raise ValueError("Column '%s' is not represented in mapper's table" % e.args[0])
+ raise ArgumentError("Column '%s' is not represented in mapper's table" % e.args[0])
self.columns[key] = prop[0]
prop = ColumnProperty(*prop)
self.props[key] = prop
prop.columns.append(column)
else:
if not allow_column_override:
- raise ValueError("WARNING: column '%s' not being added due to property '%s'. Specify 'allow_column_override=True' to mapper() to ignore this condition." % (column.key, repr(prop)))
+ raise ArgumentError("WARNING: column '%s' not being added due to property '%s'. Specify 'allow_column_override=True' to mapper() to ignore this condition." % (column.key, repr(prop)))
else:
continue
continue
c = self._get_criterion(key, value)
if c is None:
- raise "Cant find criterion for property '"+ key + "'"
+ raise InvalidRequestError("Cant find criterion for property '"+ key + "'")
if clause is None:
clause = c
else:
except KeyError:
try:
prop = self.props[column.key]
- raise "Column '%s.%s' is not available, due to conflicting property '%s':%s" % (column.table.name, column.name, column.key, repr(prop))
+ raise InvalidRequestError("Column '%s.%s' is not available, due to conflicting property '%s':%s" % (column.table.name, column.name, column.key, repr(prop)))
except KeyError:
- raise "No column %s.%s is configured on mapper %s..." % (column.table.name, column.name, str(self))
+ raise InvalidRequestError("No column %s.%s is configured on mapper %s..." % (column.table.name, column.name, str(self)))
return prop[0]
def _getattrbycolumn(self, obj, column):
self.extension.after_update(self, obj)
rows += c.cursor.rowcount
if table.engine.supports_sane_rowcount() and rows != len(update):
- raise "ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (rows, len(update))
+ raise CommitError("ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (rows, len(update)))
if len(insert):
statement = table.insert()
for rec in insert:
statement = table.delete(clause)
c = statement.execute(*delete)
if table.engine.supports_sane_rowcount() and c.rowcount != len(delete):
- raise "ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete))
+ raise CommitError("ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete)))
def _has_pks(self, table):
try:
return (class_, table.hash_key(), tuple([row[column] for column in primary_key]))
get_row_key = staticmethod(get_row_key)
- class UOWTrans(object):
+ class SessionTrans(object):
def __init__(self, parent, uow, isactive):
self.__parent = parent
self.__isactive = isactive
def begin(self):
return self.parent.begin()
def commit(self):
+ """commits the transaction noted by this SessionTrans object."""
self.__parent._trans_commit(self)
self.__isactive = False
def rollback(self):
+ """rolls back the current UnitOfWork transaction, in the case that begin()
+ has been called. The changes logged since the begin() call are discarded."""
self.__parent._trans_rollback(self)
self.__isactive = False
def begin(self):
- """begins a new UnitOfWork transaction. the next commit will affect only
- objects that are created, modified, or deleted following the begin statement."""
+ """begins a new UnitOfWork transaction and returns a tranasaction-holding
+ object. commit() or rollback() should be called on the returned object.
+ commit() on the Session will do nothing while a transaction is pending, and further
+ calls to begin() will return no-op transactional objects."""
if self.parent_uow is not None:
- return Session.UOWTrans(self, self.uow, False)
+ return Session.SessionTrans(self, self.uow, False)
self.parent_uow = self.uow
self.uow = UnitOfWork(identity_map = self.uow.identity_map)
- return Session.UOWTrans(self, self.uow, True)
+ return Session.SessionTrans(self, self.uow, True)
def _trans_commit(self, trans):
if trans.uow is self.uow and trans.isactive:
self.parent_uow = None
def commit(self, *objects):
- """commits the current UnitOfWork transaction. if a transaction was begun
- via begin(), commits only those objects that were created, modified, or deleted
- since that begin statement. otherwise commits all objects that have been
- changed.
+ """commits the current UnitOfWork transaction. called with
+ no arguments, this is only used
+ for "implicit" transactions when there was no begin().
if individual objects are submitted, then only those objects are committed, and the
begin/commit cycle is not affected."""
# if an object list is given, commit just those but dont
if self.parent_uow is None:
self.uow.commit()
- def rollback(self):
- """rolls back the current UnitOfWork transaction, in the case that begin()
- has been called. The changes logged since the begin() call are discarded."""
- if self.parent_uow is None:
- raise "UOW transaction is not begun"
- self.uow = self.parent_uow
- self.parent_uow = None
- self.begin_count = 0
-
def register_clean(self, obj):
self._bind_to(obj)
self.uow.register_clean(obj)
def list_value_changed(self, obj, key, item, listval, isdelete):
sess = get_session(obj)
if not isdelete and sess.deleted.contains(item):
- raise "re-inserting a deleted value into a list"
+ raise InvalidRequestError("re-inserting a deleted value into a list")
sess.modified_lists.append(self)
if self.deleteremoved and isdelete:
sess.register_deleted(item)
# things can get really confusing if theres duplicate instances floating around,
# so make sure everything is OK
if hasattr(obj, '_instance_key') and not self.uow.identity_map.has_key(obj._instance_key):
- raise "Detected a mapped object not present in the current thread's Identity Map: '%s'. Use objectstore.import_instance() to place deserialized instances or instances from other threads" % repr(obj._instance_key)
+ raise InvalidRequestError("Detected a mapped object not present in the current thread's Identity Map: '%s'. Use objectstore.import_instance() to place deserialized instances or instances from other threads" % repr(obj._instance_key))
mapper = object_mapper(obj)
self.mappers.append(mapper)
try:
return _sessions[hashkey]
except KeyError:
- raise "Session '%s' referenced by object '%s' no longer exists" % (hashkey, repr(obj))
+ raise InvalidRequestError("Session '%s' referenced by object '%s' no longer exists" % (hashkey, repr(obj)))
return session_registry()
import sqlalchemy.attributes as attributes
import mapper
import objectstore
+from sqlalchemy.exceptions import *
class ColumnProperty(MapperProperty):
"""describes an object attribute that corresponds to a table column."""
self.parent = parent
if self.secondaryjoin is not None and self.secondary is None:
- raise ValueError("Property '" + self.key + "' specified with secondary join condition but no secondary argument")
+ raise ArgumentError("Property '" + self.key + "' specified with secondary join condition but no secondary argument")
# if join conditions were not specified, figure them out based on foreign keys
if self.secondary is not None:
if self.secondaryjoin is None:
if not self.mapper.props[self.backref].is_backref:
self.is_backref=True
elif not objectstore.global_attributes.is_class_managed(parent.class_, key):
- raise "Non-primary property created for attribute '%s' on class '%s', but that attribute is not managed! Insure that the primary mapper for this class defines this property" % (key, parent.class_.__name__)
+ raise ArgumentError("Non-primary property created for attribute '%s' on class '%s', but that attribute is not managed! Insure that the primary mapper for this class defines this property" % (key, parent.class_.__name__))
self.do_init_subclass(key, parent)
elif self.foreigntable == self.parent.table:
return PropertyLoader.MANYTOONE
else:
- raise "Cant determine relation direction"
+ raise ArgumentError("Cant determine relation direction")
def _find_dependent(self):
"""searches through the primary join condition to determine which side
return
if isinstance(binary.left, schema.Column) and binary.left.primary_key:
if dependent[0] is binary.left.table:
- raise "bidirectional dependency not supported...specify foreignkey"
+ raise ArgumentError("bidirectional dependency not supported...specify foreignkey")
dependent[0] = binary.right.table
self.foreignkey= binary.right
elif isinstance(binary.right, schema.Column) and binary.right.primary_key:
if dependent[0] is binary.right.table:
- raise "bidirectional dependency not supported...specify foreignkey"
+ raise ArgumentError("bidirectional dependency not supported...specify foreignkey")
dependent[0] = binary.left.table
self.foreignkey = binary.left
visitor = BinaryVisitor(foo)
self.primaryjoin.accept_visitor(visitor)
if dependent[0] is None:
- raise "cant determine primary foreign key in the join relationship....specify foreignkey=<column> or foreignkey=[<columns>]"
+ raise ArgumentError("cant determine primary foreign key in the join relationship....specify foreignkey=<column> or foreignkey=[<columns>]")
else:
self.foreigntable = dependent[0]
uowcommit.register_processor(stub, self, self.parent, True)
elif self.direction == PropertyLoader.ONETOMANY:
if self.post_update:
- raise "post_update not yet supported with one-to-many relation"
+ raise InvalidRequestError("post_update not yet supported with one-to-many relation")
uowcommit.register_dependency(self.parent, self.mapper)
uowcommit.register_processor(self.parent, self, self.parent, False)
uowcommit.register_processor(self.parent, self, self.parent, True)
uowcommit.register_dependency(self.mapper, self.parent)
uowcommit.register_processor(self.mapper, self, self.parent, False)
else:
- raise " no foreign key ?"
+ raise AssertionError(" no foreign key ?")
def get_object_dependencies(self, obj, uowcommit, passive = True):
return uowcommit.uow.attributes.get_history(obj, self.key, passive = passive)
source = binary.right
dest = binary.left
else:
- raise "Cant determine direction for relationship %s = %s" % (binary.left.fullname, binary.right.fullname)
+ raise ArgumentError("Cant determine direction for relationship %s = %s" % (binary.left.fullname, binary.right.fullname))
if self.direction == PropertyLoader.ONETOMANY:
self.syncrules.append(SyncRule(self.parent, source, dest, dest_mapper=self.mapper))
elif self.direction == PropertyLoader.MANYTOONE:
self.syncrules.append(SyncRule(self.mapper, source, dest, dest_mapper=self.parent))
else:
- raise "assert failed"
+ raise AssertionError("assert failed")
else:
pt = check_for_table(binary, parent_tables)
tt = check_for_table(binary, target_tables)
elif self.direction == PropertyLoader.MANYTOONE:
self.syncrules.append(SyncRule(self.mapper, tt, pt, dest_mapper=self.parent))
else:
- raise "assert failed"
+ raise AssertionError("assert failed")
elif pt and st:
self.syncrules.append(SyncRule(self.parent, pt, st, direction=PropertyLoader.ONETOMANY))
elif tt and st:
if self.secondaryjoin is not None:
self.secondaryjoin.accept_visitor(processor)
if len(self.syncrules) == 0:
- raise "No syncrules generated for join criterion " + str(self.primaryjoin)
+ raise ArgumentError("No syncrules generated for join criterion " + str(self.primaryjoin))
def _synchronize(self, obj, child, associationrow, clearkeys):
"""called during a commit to execute the full list of syncrules on the
self.mapper.props[prop.key] = p
if recursion_stack.has_key(prop):
- raise "Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props)
+ raise ArgumentError("Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props))
p.do_init_subclass(prop.key, prop.parent, recursion_stack)
try:
for key, value in self.mapper.props.iteritems():
if recursion_stack.has_key(value):
- raise "Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props)
+ raise InvalidRequestError("Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props))
value.setup(key, statement, recursion_stack=recursion_stack, eagertable=self.eagertarget)
finally:
del recursion_stack[self]
import string, StringIO
from sets import *
import sqlalchemy.util as util
+from sqlalchemy.exceptions import *
class QueueDependencySorter(object):
"""this is a topological sort from wikipedia. its very stable. it creates a straight-line
n.cycles = Set([n])
continue
else:
- raise "Self-referential dependency detected " + repr(t)
+ raise CommitError("Self-referential dependency detected " + repr(t))
childnode = nodes[t[1]]
parentnode = nodes[t[0]]
self._add_edge(edges, (parentnode, childnode))
continue
else:
# long cycles not allowed
- raise "Circular dependency detected " + repr(edges) + repr(queue)
+ raise CommitError("Circular dependency detected " + repr(edges) + repr(queue))
node = queue.pop()
if not hasattr(node, '_cyclical'):
output.append(node)
elif parentnode.is_descendant_of(childnode):
# check for a line thats backwards with nodes in between, this is a
# circular dependency (although confirmation on this would be helpful)
- raise "Circular dependency detected"
+ raise CommitError("Circular dependency detected")
elif not childnode.is_descendant_of(parentnode):
# if relationship doesnt exist, connect nodes together
root = childnode.get_sibling_ancestor(parentnode)
"""
-from sqlalchemy.util import *
-from sqlalchemy.types import *
+from util import *
+from types import *
+from exceptions import *
import copy, re, string
__all__ = ['SchemaItem', 'Table', 'Column', 'ForeignKey', 'Sequence', 'Index',
if redefine:
table.reload_values(*args)
elif not useexisting:
- raise "Table '%s.%s' is already defined. specify 'redefine=True' to remap columns, or 'useexisting=True' to use the existing table" % (schema, name)
+ raise ArgumentError("Table '%s.%s' is already defined. specify 'redefine=True' to remap columns, or 'useexisting=True' to use the existing table" % (schema, name))
return table
except KeyError:
if mustexist:
- raise "Table '%s.%s' not defined" % (schema, name)
+ raise ArgumentError("Table '%s.%s' not defined" % (schema, name))
table = type.__call__(self, name, engine, **kwargs)
engine.tables[key] = table
# load column definitions from the database if 'autoload' is defined
self.foreign_key = None
self._orig = None
if len(kwargs):
- raise "Unknown arguments passed to Column: " + repr(kwargs.keys())
+ raise ArgumentError("Unknown arguments passed to Column: " + repr(kwargs.keys()))
original = property(lambda s: s._orig or s)
engine = property(lambda s: s.table.engine)
def _set_parent(self, table):
if getattr(self, 'table', None) is not None:
- raise "this Column already has a table!"
+ raise ArgumentError("this Column already has a table!")
if not self.hidden:
table.columns[self.key] = self
if self.primary_key:
if isinstance(self._colspec, str):
m = re.match(r"^([\w_-]+)(?:\.([\w_-]+))?(?:\.([\w_-]+))?$", self._colspec)
if m is None:
- raise ValueError("Invalid foreign key column specification: " + self._colspec)
+ raise ArgumentError("Invalid foreign key column specification: " + self._colspec)
if m.group(3) is None:
(tname, colname) = m.group(1, 2)
schema = self.parent.original.table.schema
self.table = column.table
elif column.table != self.table:
# all columns muse be from same table
- raise ValueError("All index columns must be from same table. "
+ raise ArgumentError("All index columns must be from same table. "
"%s is from %s not %s" % (column,
column.table,
self.table))
"""defines the base components of SQL expression trees."""
-import sqlalchemy.schema as schema
-import sqlalchemy.util as util
-import sqlalchemy.types as sqltypes
+import schema
+import util
+import types as sqltypes
+from exceptions import *
import string, re, random
types = __import__('types')
engine = self.engine
if engine is None:
- raise "no SQLEngine could be located within this ClauseElement."
+ raise InvalidRequestError("no SQLEngine could be located within this ClauseElement.")
return engine.compile(self, parameters=parameters, typemap=typemap)
if _is_literal(obj):
if obj is None:
if operator != '=':
- raise "Only '=' operator can be used with NULL"
+ raise ArgumentError("Only '=' operator can be used with NULL")
return BooleanExpression(self._compare_self(), null(), 'IS')
else:
obj = self._bind_param(obj)
crit.append(secondary._get_col_by_original(fk.column) == fk.parent)
self.foreignkey = fk.parent
if len(crit) == 0:
- raise "Cant find any foreign key relationships between '%s' (%s) and '%s' (%s)" % (primary.name, repr(primary), secondary.name, repr(secondary))
+ raise ArgumentError("Cant find any foreign key relationships between '%s' (%s) and '%s' (%s)" % (primary.name, repr(primary), secondary.name, repr(secondary)))
elif len(crit) == 1:
return (crit[0])
else:
if engine is None:
engine = self.engine
if engine is None:
- raise "no SQLEngine could be located within this ClauseElement."
+ raise InvalidRequestError("no SQLEngine could be located within this ClauseElement.")
return engine.compile(self.column, parameters=parameters, typemap=typemap)
class TableImpl(FromClause):
__all__ = ['OrderedProperties', 'OrderedDict', 'generic_repr', 'HashSet']
import thread, weakref, UserList,string, inspect
+from exceptions import *
def to_list(x):
if x is None:
return id(self)
def _setrecord(self, item):
if self.readonly:
- raise "This list is read only"
+ raise InvalidRequestError("This list is read only")
try:
val = self.records[item]
if val is True or val is None:
return True
def _delrecord(self, item):
if self.readonly:
- raise "This list is read only"
+ raise InvalidRequestError("This list is read only")
try:
val = self.records[item]
if val is None:
if hasattr(instance, arg):
newparams[arg] = getattr(instance, arg)
else:
- raise "instance has no attribute '%s'" % arg
+ raise AssertionError("instance has no attribute '%s'" % arg)
return newparams
\ No newline at end of file
p = Person()
p.balls.append(b)
p.favorateBall = b
- #objectstore.commit()
+ objectstore.commit()
if __name__ == "__main__":
# assert that overriding a column raises an error
try:
m = mapper(User, users, properties = {
- 'user_name' : relation(Address, addresses),
+ 'user_name' : relation(mapper(Address, addresses)),
})
- self.assert_(False, "should have raised ValueError")
- except ValueError, e:
+ self.assert_(False, "should have raised ArgumentError")
+ except ArgumentError, e:
self.assert_(True)
# assert that allow_column_override cancels the error
from tables import *
import tables
-
class HistoryTest(AssertMixin):
def setUpAll(self):
db.echo = False