]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix assorted bugs related to identity column in partitioned tables
authorPeter Eisentraut <peter@eisentraut.org>
Tue, 7 May 2024 20:42:32 +0000 (22:42 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Tue, 7 May 2024 20:50:00 +0000 (22:50 +0200)
When changing the data type of a column of a partitioned table, craft
the ALTER SEQUENCE command only once.  Partitions do not have identity
sequences of their own and thus do not need a ALTER SEQUENCE command
for each partition.

Fix getIdentitySequence() to fetch the identity sequence associated
with the top-level partitioned table when a Relation of a partition is
passed to it.  While doing so, translate the attribute number of the
partition into the attribute number of the partitioned table.

Author: Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>
Reported-by: Alexander Lakhin <exclusion@gmail.com>
Reviewed-by: Dmitry Dolgov <9erthalion6@gmail.com>
Discussion: https://www.postgresql.org/message-id/3b8a9dc1-bbc7-0ef5-6863-c432afac7d59@gmail.com

src/backend/catalog/pg_depend.c
src/backend/commands/tablecmds.c
src/backend/parser/parse_utilcmd.c
src/backend/rewrite/rewriteHandler.c
src/include/catalog/dependency.h
src/test/regress/expected/identity.out
src/test/regress/sql/identity.sql

index f85a898de8d893039b07010180c55e887b1f1878..5366f7820c12ef1c6928b98ffaa249389dcb3b2b 100644 (file)
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_depend.h"
 #include "catalog/pg_extension.h"
+#include "catalog/partition.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 #include "utils/rel.h"
 
 
@@ -941,10 +943,35 @@ getOwnedSequences(Oid relid)
  * Get owned identity sequence, error if not exactly one.
  */
 Oid
-getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok)
+getIdentitySequence(Relation rel, AttrNumber attnum, bool missing_ok)
 {
-       List       *seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
+       Oid                     relid;
+       List       *seqlist;
 
+       /*
+        * The identity sequence is associated with the topmost partitioned table,
+        * which might have column order different than the given partition.
+        */
+       if (RelationGetForm(rel)->relispartition)
+       {
+               List       *ancestors =
+                       get_partition_ancestors(RelationGetRelid(rel));
+               HeapTuple       ctup = SearchSysCacheAttNum(RelationGetRelid(rel), attnum);
+               const char *attname = NameStr(((Form_pg_attribute) GETSTRUCT(ctup))->attname);
+               HeapTuple       ptup;
+
+               relid = llast_oid(ancestors);
+               ptup = SearchSysCacheAttName(relid, attname);
+               attnum = ((Form_pg_attribute) GETSTRUCT(ptup))->attnum;
+
+               ReleaseSysCache(ctup);
+               ReleaseSysCache(ptup);
+               list_free(ancestors);
+       }
+       else
+               relid = RelationGetRelid(rel);
+
+       seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
        if (list_length(seqlist) > 1)
                elog(ERROR, "more than one owned sequence found");
        else if (seqlist == NIL)
index a79ac884f7cbcfcaa9a90c144790aa70977c48d2..5bf5e69c5b8a8750f352f07a1e9c2247997ac94d 100644 (file)
@@ -8535,7 +8535,7 @@ ATExecDropIdentity(Relation rel, const char *colName, bool missing_ok, LOCKMODE
        if (!recursing)
        {
                /* drop the internal sequence */
-               seqid = getIdentitySequence(RelationGetRelid(rel), attnum, false);
+               seqid = getIdentitySequence(rel, attnum, false);
                deleteDependencyRecordsForClass(RelationRelationId, seqid,
                                                                                RelationRelationId, DEPENDENCY_INTERNAL);
                CommandCounterIncrement();
index 9fb6ff86db55c69b89255be995158bcbec5b1d26..6520bf9baa550c585fc30102c364adfb76bec494 100644 (file)
@@ -1136,7 +1136,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
                         * find sequence owned by old column; extract sequence parameters;
                         * build new create sequence command
                         */
-                       seq_relid = getIdentitySequence(RelationGetRelid(relation), attribute->attnum, false);
+                       seq_relid = getIdentitySequence(relation, attribute->attnum, false);
                        seq_options = sequence_options(seq_relid);
                        generateSerialExtraStmts(cxt, def,
                                                                         InvalidOid, seq_options,
@@ -3716,28 +3716,36 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
 
                                        /*
                                         * For identity column, create ALTER SEQUENCE command to
-                                        * change the data type of the sequence.
+                                        * change the data type of the sequence. Identity sequence
+                                        * is associated with the top level partitioned table.
+                                        * Hence ignore partitions.
                                         */
-                                       attnum = get_attnum(relid, cmd->name);
-                                       if (attnum == InvalidAttrNumber)
-                                               ereport(ERROR,
-                                                               (errcode(ERRCODE_UNDEFINED_COLUMN),
-                                                                errmsg("column \"%s\" of relation \"%s\" does not exist",
-                                                                               cmd->name, RelationGetRelationName(rel))));
-
-                                       if (attnum > 0 &&
-                                               TupleDescAttr(tupdesc, attnum - 1)->attidentity)
+                                       if (!RelationGetForm(rel)->relispartition)
                                        {
-                                               Oid                     seq_relid = getIdentitySequence(relid, attnum, false);
-                                               Oid                     typeOid = typenameTypeId(pstate, def->typeName);
-                                               AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
-
-                                               altseqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
-                                                                                                                       get_rel_name(seq_relid),
-                                                                                                                       -1);
-                                               altseqstmt->options = list_make1(makeDefElem("as", (Node *) makeTypeNameFromOid(typeOid, -1), -1));
-                                               altseqstmt->for_identity = true;
-                                               cxt.blist = lappend(cxt.blist, altseqstmt);
+                                               attnum = get_attnum(relid, cmd->name);
+                                               if (attnum == InvalidAttrNumber)
+                                                       ereport(ERROR,
+                                                                       (errcode(ERRCODE_UNDEFINED_COLUMN),
+                                                                        errmsg("column \"%s\" of relation \"%s\" does not exist",
+                                                                                       cmd->name, RelationGetRelationName(rel))));
+
+                                               if (attnum > 0 &&
+                                                       TupleDescAttr(tupdesc, attnum - 1)->attidentity)
+                                               {
+                                                       Oid                     seq_relid = getIdentitySequence(rel, attnum, false);
+                                                       Oid                     typeOid = typenameTypeId(pstate, def->typeName);
+                                                       AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
+
+                                                       altseqstmt->sequence
+                                                               = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
+                                                                                          get_rel_name(seq_relid),
+                                                                                          -1);
+                                                       altseqstmt->options = list_make1(makeDefElem("as",
+                                                                                                                                                (Node *) makeTypeNameFromOid(typeOid, -1),
+                                                                                                                                                -1));
+                                                       altseqstmt->for_identity = true;
+                                                       cxt.blist = lappend(cxt.blist, altseqstmt);
+                                               }
                                        }
 
                                        newcmds = lappend(newcmds, cmd);
@@ -3803,7 +3811,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
                                                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
                                                                                cmd->name, RelationGetRelationName(rel))));
 
-                                       seq_relid = getIdentitySequence(relid, attnum, true);
+                                       seq_relid = getIdentitySequence(rel, attnum, true);
 
                                        if (seq_relid)
                                        {
index 9fd05b15e7342132172db9f018fafaa654de6503..8a29fbbc46525e63a46ec14c7baef311a108fc18 100644 (file)
@@ -24,7 +24,6 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/dependency.h"
-#include "catalog/partition.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "foreign/fdwapi.h"
@@ -1233,24 +1232,8 @@ build_column_default(Relation rel, int attrno)
        if (att_tup->attidentity)
        {
                NextValueExpr *nve = makeNode(NextValueExpr);
-               Oid                     reloid;
 
-               /*
-                * The identity sequence is associated with the topmost partitioned
-                * table.
-                */
-               if (rel->rd_rel->relispartition)
-               {
-                       List       *ancestors =
-                               get_partition_ancestors(RelationGetRelid(rel));
-
-                       reloid = llast_oid(ancestors);
-                       list_free(ancestors);
-               }
-               else
-                       reloid = RelationGetRelid(rel);
-
-               nve->seqid = getIdentitySequence(reloid, attrno, false);
+               nve->seqid = getIdentitySequence(rel, attrno, false);
                nve->typeId = att_tup->atttypid;
 
                return (Node *) nve;
index e0dcc0b069e9a7ae84d63bc46ef8e6d684e56f89..7eee66f8106a15599af7ab5b9c1b9f3ea0bec5a8 100644 (file)
@@ -177,7 +177,7 @@ extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId);
 
 extern bool sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId);
 extern List *getOwnedSequences(Oid relid);
-extern Oid     getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok);
+extern Oid     getIdentitySequence(Relation rel, AttrNumber attnum, bool missing_ok);
 
 extern Oid     get_index_constraint(Oid indexId);
 
index f357b9b63b10c24378a2ba7056dda96922b3e94a..31ad041055f7d26887618c61723f2625772d5979 100644 (file)
@@ -619,14 +619,17 @@ CREATE TABLE pitest1_p1 PARTITION OF pitest1 FOR VALUES FROM ('2016-07-01') TO (
 INSERT into pitest1(f1, f2) VALUES ('2016-07-2', 'from pitest1');
 INSERT into pitest1_p1 (f1, f2) VALUES ('2016-07-3', 'from pitest1_p1');
 -- attached partition
-CREATE TABLE pitest1_p2 (f1 date NOT NULL, f2 text, f3 bigint);
-INSERT INTO pitest1_p2 VALUES ('2016-08-2', 'before attaching', 100);
+CREATE TABLE pitest1_p2 (f3 bigint, f2 text, f1 date NOT NULL);
+INSERT INTO pitest1_p2 (f1, f2, f3) VALUES ('2016-08-2', 'before attaching', 100);
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01'); -- requires NOT NULL constraint
 ERROR:  column "f3" in child table must be marked NOT NULL
 ALTER TABLE pitest1_p2 ALTER COLUMN f3 SET NOT NULL;
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01');
 INSERT INTO pitest1_p2 (f1, f2) VALUES ('2016-08-3', 'from pitest1_p2');
 INSERT INTO pitest1 (f1, f2) VALUES ('2016-08-4', 'from pitest1');
+-- LIKE INCLUDING on partition
+CREATE TABLE pitest1_p1_like (LIKE pitest1_p1 INCLUDING IDENTITY);
+INSERT into pitest1_p1_like(f1, f2) VALUES ('2016-07-2', 'from pitest1_p1_like');
 SELECT tableoid::regclass, f1, f2, f3 FROM pitest1;
   tableoid  |     f1     |        f2        | f3  
 ------------+------------+------------------+-----
@@ -637,6 +640,31 @@ SELECT tableoid::regclass, f1, f2, f3 FROM pitest1;
  pitest1_p2 | 08-04-2016 | from pitest1     |   4
 (5 rows)
 
+SELECT tableoid::regclass, f1, f2, f3 FROM pitest1_p1_like;
+    tableoid     |     f1     |          f2          | f3 
+-----------------+------------+----------------------+----
+ pitest1_p1_like | 07-02-2016 | from pitest1_p1_like |  1
+(1 row)
+
+ALTER TABLE pitest1 ALTER COLUMN f3 SET DATA TYPE bigint;
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1;
+  tableoid  |     f1     |        f2        | f3  | pg_typeof 
+------------+------------+------------------+-----+-----------
+ pitest1_p1 | 07-02-2016 | from pitest1     |   1 | bigint
+ pitest1_p1 | 07-03-2016 | from pitest1_p1  |   2 | bigint
+ pitest1_p2 | 08-02-2016 | before attaching | 100 | bigint
+ pitest1_p2 | 08-03-2016 | from pitest1_p2  |   3 | bigint
+ pitest1_p2 | 08-04-2016 | from pitest1     |   4 | bigint
+(5 rows)
+
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1_p2;
+  tableoid  |     f1     |        f2        | f3  | pg_typeof 
+------------+------------+------------------+-----+-----------
+ pitest1_p2 | 08-02-2016 | before attaching | 100 | bigint
+ pitest1_p2 | 08-03-2016 | from pitest1_p2  |   3 | bigint
+ pitest1_p2 | 08-04-2016 | from pitest1     |   4 | bigint
+(3 rows)
+
 -- add identity column
 CREATE TABLE pitest2 (f1 date NOT NULL, f2 text) PARTITION BY RANGE (f1);
 CREATE TABLE pitest2_p1 PARTITION OF pitest2 FOR VALUES FROM ('2016-07-01') TO ('2016-08-01');
index 7b0800226ca0ac3066cd067232e9bdaa64bead5f..27ca708ff196b36024ad6c39fcf3aaa14cf311e0 100644 (file)
@@ -358,14 +358,21 @@ CREATE TABLE pitest1_p1 PARTITION OF pitest1 FOR VALUES FROM ('2016-07-01') TO (
 INSERT into pitest1(f1, f2) VALUES ('2016-07-2', 'from pitest1');
 INSERT into pitest1_p1 (f1, f2) VALUES ('2016-07-3', 'from pitest1_p1');
 -- attached partition
-CREATE TABLE pitest1_p2 (f1 date NOT NULL, f2 text, f3 bigint);
-INSERT INTO pitest1_p2 VALUES ('2016-08-2', 'before attaching', 100);
+CREATE TABLE pitest1_p2 (f3 bigint, f2 text, f1 date NOT NULL);
+INSERT INTO pitest1_p2 (f1, f2, f3) VALUES ('2016-08-2', 'before attaching', 100);
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01'); -- requires NOT NULL constraint
 ALTER TABLE pitest1_p2 ALTER COLUMN f3 SET NOT NULL;
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01');
 INSERT INTO pitest1_p2 (f1, f2) VALUES ('2016-08-3', 'from pitest1_p2');
 INSERT INTO pitest1 (f1, f2) VALUES ('2016-08-4', 'from pitest1');
+-- LIKE INCLUDING on partition
+CREATE TABLE pitest1_p1_like (LIKE pitest1_p1 INCLUDING IDENTITY);
+INSERT into pitest1_p1_like(f1, f2) VALUES ('2016-07-2', 'from pitest1_p1_like');
 SELECT tableoid::regclass, f1, f2, f3 FROM pitest1;
+SELECT tableoid::regclass, f1, f2, f3 FROM pitest1_p1_like;
+ALTER TABLE pitest1 ALTER COLUMN f3 SET DATA TYPE bigint;
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1;
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1_p2;
 
 -- add identity column
 CREATE TABLE pitest2 (f1 date NOT NULL, f2 text) PARTITION BY RANGE (f1);