def is_disconnect(self, e, connection, cursor):
return "connection is closed" in str(e)
+ def set_isolation_level(self, connection, level):
+ level = level.replace('_', ' ')
+ print("level is", level)
+ print("autocommit is", connection.autocommit)
+ print("class is", connection)
+
+ if level == 'AUTOCOMMIT':
+ connection.connection.autocommit = True
+ elif level in self._isolation_lookup:
+ connection.connection.autocommit = False
+ cursor = connection.cursor()
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION "
+ "ISOLATION LEVEL %s" % level)
+ cursor.execute("COMMIT")
+ cursor.close()
+ else:
+ raise exc.ArgumentError(
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s or AUTOCOMMIT" %
+ (level, self.name, ", ".join(self._isolation_lookup))
+ )
+ print("autocommit is now", connection.autocommit)
+
dialect = PGDialect_pg8000
c = e.connect()
eq_(c.connection.connection.encoding, test_encoding)
- @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
+ @testing.only_on(
+ ['postgresql+psycopg2', 'postgresql+pg8000'],
+ 'psycopg2 / pg8000 - specific feature')
@engines.close_open_connections
def test_autocommit_isolation_level(self):
- extensions = __import__('psycopg2.extensions').extensions
-
- c = testing.db.connect()
- c = c.execution_options(isolation_level='AUTOCOMMIT')
- eq_(c.connection.connection.isolation_level,
- extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ c = testing.db.connect().execution_options(
+ isolation_level='AUTOCOMMIT')
+ # If we're really in autocommit mode then we'll get an error saying
+ # that the prepared transaction doesn't exist. Otherwise, we'd
+ # get an error saying that the command can't be run within a
+ # transaction.
+ assert_raises_message(
+ exc.ProgrammingError,
+ 'prepared transaction with identifier "gilberte" does not exist',
+ c.execute, "commit prepared 'gilberte'")
@testing.fails_on('+zxjdbc',
"Can't infer the SQL type to use for an instance "