]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
pg_dump: label PUBLICATION TABLE ArchiveEntries with an owner.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 14 Jan 2021 21:19:38 +0000 (16:19 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 14 Jan 2021 21:19:38 +0000 (16:19 -0500)
This is the same fix as commit 9eabfe300 applied to INDEX ATTACH
entries, but for table-to-publication attachments.  As in that
case, even though the backend doesn't record "ownership" of the
attachment, we still ought to label it in the dump archive with
the role name that should run the ALTER PUBLICATION command.
The existing behavior causes the ALTER to be done by the original
role that started the restore; that will usually work fine, but
there may be corner cases where it fails.

The bulk of the patch is concerned with changing struct
PublicationRelInfo to include a pointer to the associated
PublicationInfo object, so that we can get the owner's name
out of that when the time comes.  While at it, I rewrote
getPublicationTables() to do just one query of pg_publication_rel,
not one per table.

Back-patch to v10 where this code was introduced.

Discussion: https://postgr.es/m/1165710.1610473242@sss.pgh.pa.us

src/bin/pg_dump/common.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h

index 084ccd1b6b90d241b1b09a4cf7e4a553147f1e4b..da5556035f1c02bc3c1e1bafdb9dce945806d9ee 100644 (file)
@@ -54,6 +54,7 @@ static DumpableObject **oprinfoindex;
 static DumpableObject **collinfoindex;
 static DumpableObject **nspinfoindex;
 static DumpableObject **extinfoindex;
+static DumpableObject **pubinfoindex;
 static int     numTables;
 static int     numTypes;
 static int     numFuncs;
@@ -61,6 +62,7 @@ static int    numOperators;
 static int     numCollations;
 static int     numNamespaces;
 static int     numExtensions;
+static int     numPublications;
 
 /* This is an array of object identities, not actual DumpableObjects */
 static ExtensionMemberId *extmembers;
@@ -95,6 +97,7 @@ getSchemaData(Archive *fout, int *numTablesPtr)
        CollInfo   *collinfo;
        NamespaceInfo *nspinfo;
        ExtensionInfo *extinfo;
+       PublicationInfo *pubinfo;
        InhInfo    *inhinfo;
        int                     numAggregates;
        int                     numInherits;
@@ -286,7 +289,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 
        if (g_verbose)
                write_msg(NULL, "reading publications\n");
-       getPublications(fout);
+       pubinfo = getPublications(fout, &numPublications);
+       pubinfoindex = buildIndexArray(pubinfo, numPublications,
+                                                                  sizeof(PublicationInfo));
 
        if (g_verbose)
                write_msg(NULL, "reading publication membership\n");
@@ -935,6 +940,17 @@ findExtensionByOid(Oid oid)
        return (ExtensionInfo *) findObjectByOid(oid, extinfoindex, numExtensions);
 }
 
+/*
+ * findPublicationByOid
+ *       finds the entry (in pubinfo) of the publication with the given oid
+ *       returns NULL if not found
+ */
+PublicationInfo *
+findPublicationByOid(Oid oid)
+{
+       return (PublicationInfo *) findObjectByOid(oid, pubinfoindex, numPublications);
+}
+
 /*
  * findIndexByOid
  *             find the entry of the index with the given oid
index 607167c5023f4d8cd7e978cf09597168cd9751a9..8d7d774290ab61eb8b7347b06680a4e92e571365 100644 (file)
@@ -3742,8 +3742,8 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo)
  * getPublications
  *       get information about publications
  */
-void
-getPublications(Archive *fout)
+PublicationInfo *
+getPublications(Archive *fout, int *numPublications)
 {
        DumpOptions *dopt = fout->dopt;
        PQExpBuffer query;
@@ -3762,7 +3762,10 @@ getPublications(Archive *fout)
                                ntups;
 
        if (dopt->no_publications || fout->remoteVersion < 100000)
-               return;
+       {
+               *numPublications = 0;
+               return NULL;
+       }
 
        query = createPQExpBuffer();
 
@@ -3830,6 +3833,9 @@ getPublications(Archive *fout)
        PQclear(res);
 
        destroyPQExpBuffer(query);
+
+       *numPublications = ntups;
+       return pubinfo;
 }
 
 /*
@@ -3935,7 +3941,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
        DumpOptions *dopt = fout->dopt;
        int                     i_tableoid;
        int                     i_oid;
-       int                     i_pubname;
+       int                     i_prpubid;
+       int                     i_prrelid;
        int                     i,
                                j,
                                ntups;
@@ -3945,12 +3952,39 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
        query = createPQExpBuffer();
 
-       for (i = 0; i < numTables; i++)
+       /* Collect all publication membership info. */
+       appendPQExpBufferStr(query,
+                                                "SELECT tableoid, oid, prpubid, prrelid "
+                                                "FROM pg_catalog.pg_publication_rel");
+       res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+       ntups = PQntuples(res);
+
+       i_tableoid = PQfnumber(res, "tableoid");
+       i_oid = PQfnumber(res, "oid");
+       i_prpubid = PQfnumber(res, "prpubid");
+       i_prrelid = PQfnumber(res, "prrelid");
+
+       /* this allocation may be more than we need */
+       pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
+       j = 0;
+
+       for (i = 0; i < ntups; i++)
        {
-               TableInfo  *tbinfo = &tblinfo[i];
+               Oid                     prpubid = atooid(PQgetvalue(res, i, i_prpubid));
+               Oid                     prrelid = atooid(PQgetvalue(res, i, i_prrelid));
+               PublicationInfo *pubinfo;
+               TableInfo  *tbinfo;
 
-               /* Only plain tables can be aded to publications. */
-               if (tbinfo->relkind != RELKIND_RELATION)
+               /*
+                * Ignore any entries for which we aren't interested in either the
+                * publication or the rel.
+                */
+               pubinfo = findPublicationByOid(prpubid);
+               if (pubinfo == NULL)
+                       continue;
+               tbinfo = findTableByOid(prrelid);
+               if (tbinfo == NULL)
                        continue;
 
                /*
@@ -3960,56 +3994,24 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
                if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
                        continue;
 
-               if (g_verbose)
-                       write_msg(NULL, "reading publication membership for table \"%s.%s\"\n",
-                                         tbinfo->dobj.namespace->dobj.name,
-                                         tbinfo->dobj.name);
-
-               resetPQExpBuffer(query);
-
-               /* Get the publication membership for the table. */
-               appendPQExpBuffer(query,
-                                                 "SELECT pr.tableoid, pr.oid, p.pubname "
-                                                 "FROM pg_publication_rel pr, pg_publication p "
-                                                 "WHERE pr.prrelid = '%u'"
-                                                 "  AND p.oid = pr.prpubid",
-                                                 tbinfo->dobj.catId.oid);
-               res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
-
-               ntups = PQntuples(res);
-
-               if (ntups == 0)
-               {
-                       /*
-                        * Table is not member of any publications. Clean up and return.
-                        */
-                       PQclear(res);
-                       continue;
-               }
-
-               i_tableoid = PQfnumber(res, "tableoid");
-               i_oid = PQfnumber(res, "oid");
-               i_pubname = PQfnumber(res, "pubname");
+               /* OK, make a DumpableObject for this relationship */
+               pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
+               pubrinfo[j].dobj.catId.tableoid =
+                       atooid(PQgetvalue(res, i, i_tableoid));
+               pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
+               AssignDumpId(&pubrinfo[j].dobj);
+               pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
+               pubrinfo[j].dobj.name = tbinfo->dobj.name;
+               pubrinfo[j].publication = pubinfo;
+               pubrinfo[j].pubtable = tbinfo;
 
-               pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
+               /* Decide whether we want to dump it */
+               selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
 
-               for (j = 0; j < ntups; j++)
-               {
-                       pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
-                       pubrinfo[j].dobj.catId.tableoid =
-                               atooid(PQgetvalue(res, j, i_tableoid));
-                       pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
-                       AssignDumpId(&pubrinfo[j].dobj);
-                       pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
-                       pubrinfo[j].dobj.name = tbinfo->dobj.name;
-                       pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
-                       pubrinfo[j].pubtable = tbinfo;
-
-                       /* Decide whether we want to dump it */
-                       selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
-               }
-               PQclear(res);
+               j++;
        }
+
+       PQclear(res);
        destroyPQExpBuffer(query);
 }
 
@@ -4020,6 +4022,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 static void
 dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
 {
+       PublicationInfo *pubinfo = pubrinfo->publication;
        TableInfo  *tbinfo = pubrinfo->pubtable;
        PQExpBuffer query;
        char       *tag;
@@ -4027,24 +4030,27 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
        if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
                return;
 
-       tag = psprintf("%s %s", pubrinfo->pubname, tbinfo->dobj.name);
+       tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
 
        query = createPQExpBuffer();
 
        appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
-                                         fmtId(pubrinfo->pubname));
+                                         fmtId(pubinfo->dobj.name));
        appendPQExpBuffer(query, " %s;\n",
                                          fmtQualifiedDumpable(tbinfo));
 
        /*
-        * There is no point in creating drop query as drop query as the drop is
-        * done by table drop.
+        * There is no point in creating a drop query as the drop is done by table
+        * drop.  (If you think to change this, see also _printTocEntry().)
+        * Although this object doesn't really have ownership as such, set the
+        * owner field anyway to ensure that the command is run by the correct
+        * role at restore time.
         */
        ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
                                 tag,
                                 tbinfo->dobj.namespace->dobj.name,
                                 NULL,
-                                "", false,
+                                pubinfo->rolname, false,
                                 "PUBLICATION TABLE", SECTION_POST_DATA,
                                 query->data, "", NULL,
                                 NULL, 0,
index 6482f495271b281549e7c845844f5fb420d53e1e..444c8ca648f37a741ba64c29c0088ee839657f30 100644 (file)
@@ -611,8 +611,8 @@ typedef struct _PublicationInfo
 typedef struct _PublicationRelInfo
 {
        DumpableObject dobj;
+       PublicationInfo *publication;
        TableInfo  *pubtable;
-       char       *pubname;
 } PublicationRelInfo;
 
 /*
@@ -671,6 +671,7 @@ extern OprInfo *findOprByOid(Oid oid);
 extern CollInfo *findCollationByOid(Oid oid);
 extern NamespaceInfo *findNamespaceByOid(Oid oid);
 extern ExtensionInfo *findExtensionByOid(Oid oid);
+extern PublicationInfo *findPublicationByOid(Oid oid);
 
 extern void setExtensionMembership(ExtensionMemberId *extmems, int nextmems);
 extern ExtensionInfo *findOwningExtension(CatalogId catalogId);
@@ -724,7 +725,8 @@ extern void processExtensionTables(Archive *fout, ExtensionInfo extinfo[],
                                           int numExtensions);
 extern EventTriggerInfo *getEventTriggers(Archive *fout, int *numEventTriggers);
 extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables);
-extern void getPublications(Archive *fout);
+extern PublicationInfo *getPublications(Archive *fout,
+                                                                               int *numPublications);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
                                         int numTables);
 extern void getSubscriptions(Archive *fout);