With cPython 2.7, runtimes observed::
classics-MacBook-Pro:sqlalchemy classic$ python test.py
- SQLAlchemy ORM: Total time for 100000 records 14.3528850079 secs
- SQLAlchemy ORM pk given: Total time for 100000 records 10.0164160728 secs
- SQLAlchemy Core: Total time for 100000 records 0.775382995605 secs
- sqlite3: Total time for 100000 records 0.676795005798 sec
+ SQLAlchemy ORM: Total time for 100000 records 12.4703581333 secs
+ SQLAlchemy ORM pk given: Total time for 100000 records 7.32723999023 secs
+ SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 3.43464708328 secs
+ SQLAlchemy ORM bulk_save_mappings(): Total time for 100000 records 2.37040805817 secs
+ SQLAlchemy Core: Total time for 100000 records 0.495043992996 secs
+ sqlite3: Total time for 100000 records 0.508063077927 sec
We can reduce the time by a factor of three using recent versions of `Pypy <http://pypy.org/>`_::
DBSession = scoped_session(sessionmaker())
engine = None
+
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
+
def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
global engine
engine = create_engine(dbname, echo=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
+
def test_sqlalchemy_orm(n=100000):
init_sqlalchemy()
t0 = time.time()
- for i in range(n):
+ for i in xrange(n):
customer = Customer()
customer.name = 'NAME ' + str(i)
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
- print("SQLAlchemy ORM: Total time for " + str(n) +
- " records " + str(time.time() - t0) + " secs")
+ print(
+ "SQLAlchemy ORM: Total time for " + str(n) +
+ " records " + str(time.time() - t0) + " secs")
+
def test_sqlalchemy_orm_pk_given(n=100000):
init_sqlalchemy()
t0 = time.time()
- for i in range(n):
+ for i in xrange(n):
customer = Customer(id=i+1, name="NAME " + str(i))
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
- print("SQLAlchemy ORM pk given: Total time for " + str(n) +
+ print(
+ "SQLAlchemy ORM pk given: Total time for " + str(n) +
+ " records " + str(time.time() - t0) + " secs")
+
+
+ def test_sqlalchemy_orm_bulk_save(n=100000):
+ init_sqlalchemy()
+ t0 = time.time()
+ n1 = n
+ while n1 > 0:
+ n1 = n1 - 10000
+ DBSession.bulk_save_objects(
+ [
+ Customer(name="NAME " + str(i))
+ for i in xrange(min(10000, n1))
+ ]
+ )
+ DBSession.commit()
+ print(
+ "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
+
+ def test_sqlalchemy_orm_bulk_save_mappings(n=100000):
+ init_sqlalchemy()
+ t0 = time.time()
+ DBSession.bulk_save_mappings(
+ Customer,
+ [
+ dict(name="NAME " + str(i))
+ for i in xrange(n)
+ ]
+ )
+ DBSession.commit()
+ print(
+ "SQLAlchemy ORM bulk_save_mappings(): Total time for " + str(n) +
+ " records " + str(time.time() - t0) + " secs")
+
+
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
- [{"name": 'NAME ' + str(i)} for i in range(n)]
+ [{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
- print("SQLAlchemy Core: Total time for " + str(n) +
+ print(
+ "SQLAlchemy Core: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
+
def init_sqlite3(dbname):
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute("DROP TABLE IF EXISTS customer")
- c.execute("CREATE TABLE customer (id INTEGER NOT NULL, "
- "name VARCHAR(255), PRIMARY KEY(id))")
+ c.execute(
+ "CREATE TABLE customer (id INTEGER NOT NULL, "
+ "name VARCHAR(255), PRIMARY KEY(id))")
conn.commit()
return conn
+
def test_sqlite3(n=100000, dbname='sqlite3.db'):
conn = init_sqlite3(dbname)
c = conn.cursor()
t0 = time.time()
- for i in range(n):
+ for i in xrange(n):
row = ('NAME ' + str(i),)
c.execute("INSERT INTO customer (name) VALUES (?)", row)
conn.commit()
- print("sqlite3: Total time for " + str(n) +
+ print(
+ "sqlite3: Total time for " + str(n) +
" records " + str(time.time() - t0) + " sec")
if __name__ == '__main__':
test_sqlalchemy_orm(100000)
test_sqlalchemy_orm_pk_given(100000)
+ test_sqlalchemy_orm_bulk_save(100000)
+ test_sqlalchemy_orm_bulk_save_mappings(100000)
test_sqlalchemy_core(100000)
test_sqlite3(100000)
-
Sessions / Queries
===================
from itertools import groupby
from .. import sql, util, exc as sa_exc, schema
from . import attributes, sync, exc as orm_exc, evaluator
-from .base import _state_mapper, state_str, _attr_as_key
+from .base import state_str, _attr_as_key
from ..sql import expression
from . import loading
if insert:
_emit_insert_statements(base_mapper, uowtransaction,
cached_connections,
- mapper, table, insert)
+ mapper, table, insert,
+ bookkeeping)
_finalize_insert_update_commands(base_mapper, uowtransaction,
states_to_insert, states_to_update,
states_to_insert = []
states_to_update = []
+ instance_key = None
for state, dict_, mapper, connection in _connections_for_states(
base_mapper, uowtransaction,
states):
has_identity = bool(state.key)
- instance_key = state.key or mapper._identity_key_from_state(state)
+
+ if bookkeeping:
+ instance_key = state.key or mapper._identity_key_from_state(state)
row_switch = None
if not has_identity and not row_switch:
states_to_insert.append(
(state, dict_, mapper, connection,
- has_identity, instance_key, row_switch)
+ has_identity, row_switch)
)
else:
states_to_update.append(
(state, dict_, mapper, connection,
- has_identity, instance_key, row_switch)
+ has_identity, row_switch)
)
return states_to_insert, states_to_update
"""
insert = []
for state, state_dict, mapper, connection, has_identity, \
- instance_key, row_switch in states_to_insert:
+ row_switch in states_to_insert:
+
if table not in mapper._pks_by_table:
continue
prop = mapper._columntoproperty[col]
value = state_dict.get(prop.key, None)
- if value is None:
- if bookkeeping and col in pks:
+ if bookkeeping and value is None:
+ if col in pks:
has_all_pks = False
elif col.default is None and \
col.server_default is None:
params[col.key] = value
- elif bookkeeping and col.server_default is not None and \
+ elif col.server_default is not None and \
mapper.base_mapper.eager_defaults:
has_all_defaults = False
update = []
for state, state_dict, mapper, connection, has_identity, \
- instance_key, row_switch in states_to_update:
+ row_switch in states_to_update:
if table not in mapper._pks_by_table:
continue
def _emit_insert_statements(base_mapper, uowtransaction,
- cached_connections, mapper, table, insert):
+ cached_connections, mapper, table, insert,
+ bookkeeping):
"""Emit INSERT statements corresponding to value lists collected
by _collect_insert_commands()."""
c = cached_connections[connection].\
execute(statement, multiparams)
- for (state, state_dict, params, mapper_rec,
- conn, value_params, has_all_pks, has_all_defaults), \
- last_inserted_params in \
- zip(records, c.context.compiled_parameters):
- _postfetch(
- mapper_rec,
- uowtransaction,
- table,
- state,
- state_dict,
- c,
- last_inserted_params,
- value_params)
+ if bookkeeping:
+ for (state, state_dict, params, mapper_rec,
+ conn, value_params, has_all_pks, has_all_defaults), \
+ last_inserted_params in \
+ zip(records, c.context.compiled_parameters):
+ _postfetch(
+ mapper_rec,
+ uowtransaction,
+ table,
+ state,
+ state_dict,
+ c,
+ last_inserted_params,
+ value_params)
else:
if not has_all_defaults and base_mapper.eager_defaults:
"""
for state, state_dict, mapper, connection, has_identity, \
- instance_key, row_switch in states_to_insert + \
+ row_switch in states_to_insert + \
states_to_update:
if bookkeeping:
if connection_callable:
connection = connection_callable(base_mapper, state.obj())
- mapper = _state_mapper(state)
+ mapper = state.manager.mapper
yield state, state.dict, mapper, connection
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
- 'is_modified',
+ 'is_modified', 'bulk_save_objects', 'bulk_save_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar')
with util.safe_reraise():
transaction.rollback(_capture_exception=True)
- def bulk_save(self, objects):
+ def bulk_save_objects(self, objects):
+ self._bulk_save((attributes.instance_state(obj) for obj in objects))
+
+ def bulk_save_mappings(self, mapper, mappings):
+ mapper = class_mapper(mapper)
+
+ self._bulk_save((
+ statelib.MappingState(mapper, mapping)
+ for mapping in mappings)
+ )
+
+ def _bulk_save(self, states):
self._flushing = True
flush_context = UOWTransaction(self)
if self.dispatch.before_bulk_save:
self.dispatch.before_bulk_save(
- self, flush_context, objects)
+ self, flush_context, states)
flush_context.transaction = transaction = self.begin(
subtransactions=True)
try:
self._warn_on_events = True
try:
- flush_context.bulk_save(objects)
+ flush_context.bulk_save(states)
finally:
self._warn_on_events = False
self.dispatch.after_bulk_save(
- self, flush_context, objects
+ self, flush_context, states
)
flush_context.finalize_flush_changes()
self.dispatch.after_bulk_save_postexec(
- self, flush_context, objects)
+ self, flush_context, states)
transaction.commit()
state._strong_obj = None
+class MappingState(InstanceState):
+ committed_state = {}
+ callables = {}
+
+ def __init__(self, mapper, mapping):
+ self.class_ = mapper.class_
+ self.manager = mapper.class_manager
+ self.modified = True
+ self._dict = mapping
+
+ @property
+ def dict(self):
+ return self._dict
+
+
class AttributeState(object):
"""Provide an inspection interface corresponding
to a particular attribute on a particular mapped object.
if other:
self.session._register_newly_persistent(other)
- def bulk_save(self, objects):
- for (base_mapper, in_session), states in itertools.groupby(
- (attributes.instance_state(obj) for obj in objects),
+ def bulk_save(self, states):
+ for (base_mapper, in_session), states_ in itertools.groupby(
+ states,
lambda state:
(
state.mapper.base_mapper,
)):
persistence.save_obj(
- base_mapper, list(states), self, bookkeeping=in_session)
+ base_mapper, list(states_), self, bookkeeping=in_session)
if in_session:
self.states.update(
(state, (False, False))
- for state in states
+ for state in states_
)