ORDER BY "name"
"""
schema_res = cu.execute(q)
+ sqlite_sequence = []
for table_name, type, sql in schema_res.fetchall():
if table_name == 'sqlite_sequence':
- yield('DELETE FROM "sqlite_sequence";')
+ rows = cu.execute('SELECT * FROM "sqlite_sequence";').fetchall()
+ sqlite_sequence = ['DELETE FROM "sqlite_sequence"']
+ sqlite_sequence += [
+ f'INSERT INTO "sqlite_sequence" VALUES(\'{row[0]}\',{row[1]})'
+ for row in rows
+ ]
+ continue
elif table_name == 'sqlite_stat1':
yield('ANALYZE "sqlite_master";')
elif table_name.startswith('sqlite_'):
for name, type, sql in schema_res.fetchall():
yield('{0};'.format(sql))
+ # gh-79009: Yield statements concerning the sqlite_sequence table at the
+ # end of the transaction.
+ for row in sqlite_sequence:
+ yield('{0};'.format(row))
+
yield('COMMIT;')
import unittest
import sqlite3 as sqlite
+from .test_dbapi import memory_database
+
class DumpTests(unittest.TestCase):
def setUp(self):
[self.assertEqual(expected_sqls[i], actual_sqls[i])
for i in range(len(expected_sqls))]
+ def test_dump_autoincrement(self):
+ expected = [
+ 'CREATE TABLE "t1" (id integer primary key autoincrement);',
+ 'INSERT INTO "t1" VALUES(NULL);',
+ 'CREATE TABLE "t2" (id integer primary key autoincrement);',
+ ]
+ self.cu.executescript("".join(expected))
+
+ # the NULL value should now be automatically be set to 1
+ expected[1] = expected[1].replace("NULL", "1")
+ expected.insert(0, "BEGIN TRANSACTION;")
+ expected.extend([
+ 'DELETE FROM "sqlite_sequence";',
+ 'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
+ 'COMMIT;',
+ ])
+
+ actual = [stmt for stmt in self.cx.iterdump()]
+ self.assertEqual(expected, actual)
+
+ def test_dump_autoincrement_create_new_db(self):
+ self.cu.execute("BEGIN TRANSACTION")
+ self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
+ self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
+ self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
+ self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
+ self.cx.commit()
+
+ with memory_database() as cx2:
+ query = "".join(self.cx.iterdump())
+ cx2.executescript(query)
+ cu2 = cx2.cursor()
+
+ dataset = (
+ ("t1", 9),
+ ("t2", 4),
+ )
+ for table, seq in dataset:
+ with self.subTest(table=table, seq=seq):
+ res = cu2.execute("""
+ SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
+ """, (table,))
+ rows = res.fetchall()
+ self.assertEqual(rows[0][0], seq)
+
def test_unorderable_row(self):
# iterdump() should be able to cope with unorderable row types (issue #15545)
class UnorderableRow: