]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
In pg_dump, avoid doing per-table queries for RLS policies.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 31 Aug 2021 19:04:05 +0000 (15:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 31 Aug 2021 19:04:05 +0000 (15:04 -0400)
For no particularly good reason, getPolicies() queried pg_policy
separately for each table.  We can collect all the policies in
a single query instead, and attach them to the correct TableInfo
objects using findTableByOid() lookups.  On the regression
database, this reduces the number of queries substantially, and
provides a visible savings even when running against a local
server.

Per complaint from Hubert Depesz Lubaczewski.  Since this is such
a simple fix and can have a visible performance benefit, back-patch
to all supported branches.

Discussion: https://postgr.es/m/20210826084430.GA26282@depesz.com

src/bin/pg_dump/pg_dump.c

index 6259e68aa4bc9b7cba9f95d5e4835922ce7436eb..67be84982926d28242e344c7de87bdcb03709a9e 100644 (file)
@@ -3656,7 +3656,7 @@ dumpBlobs(Archive *fout, const void *arg)
 
 /*
  * getPolicies
- *       get information about policies on a dumpable table.
+ *       get information about all RLS policies on dumpable tables.
  */
 void
 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
@@ -3666,6 +3666,7 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
        PolicyInfo *polinfo;
        int                     i_oid;
        int                     i_tableoid;
+       int                     i_polrelid;
        int                     i_polname;
        int                     i_polcmd;
        int                     i_polpermissive;
@@ -3681,6 +3682,10 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
 
        query = createPQExpBuffer();
 
+       /*
+        * First, check which tables have RLS enabled.  We represent RLS being
+        * enabled on a table by creating a PolicyInfo object with null polname.
+        */
        for (i = 0; i < numTables; i++)
        {
                TableInfo  *tbinfo = &tblinfo[i];
@@ -3689,15 +3694,6 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
                if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
                        continue;
 
-               pg_log_info("reading row security enabled for table \"%s.%s\"",
-                                       tbinfo->dobj.namespace->dobj.name,
-                                       tbinfo->dobj.name);
-
-               /*
-                * Get row security enabled information for the table. We represent
-                * RLS being enabled on a table by creating a PolicyInfo object with
-                * null polname.
-                */
                if (tbinfo->rowsec)
                {
                        /*
@@ -3719,51 +3715,35 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
                        polinfo->polqual = NULL;
                        polinfo->polwithcheck = NULL;
                }
+       }
 
-               pg_log_info("reading policies for table \"%s.%s\"",
-                                       tbinfo->dobj.namespace->dobj.name,
-                                       tbinfo->dobj.name);
-
-               resetPQExpBuffer(query);
-
-               /* Get the policies for the table. */
-               if (fout->remoteVersion >= 100000)
-                       appendPQExpBuffer(query,
-                                                         "SELECT oid, tableoid, pol.polname, pol.polcmd, pol.polpermissive, "
-                                                         "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
-                                                         "   pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
-                                                         "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
-                                                         "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
-                                                         "FROM pg_catalog.pg_policy pol "
-                                                         "WHERE polrelid = '%u'",
-                                                         tbinfo->dobj.catId.oid);
-               else
-                       appendPQExpBuffer(query,
-                                                         "SELECT oid, tableoid, pol.polname, pol.polcmd, 't' as polpermissive, "
-                                                         "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
-                                                         "   pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
-                                                         "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
-                                                         "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
-                                                         "FROM pg_catalog.pg_policy pol "
-                                                         "WHERE polrelid = '%u'",
-                                                         tbinfo->dobj.catId.oid);
-               res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+       /*
+        * Now, read all RLS policies, and create PolicyInfo objects for all those
+        * that are of interest.
+        */
+       pg_log_info("reading row-level security policies");
 
-               ntups = PQntuples(res);
+       printfPQExpBuffer(query,
+                                         "SELECT oid, tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
+       if (fout->remoteVersion >= 100000)
+               appendPQExpBuffer(query, "pol.polpermissive, ");
+       else
+               appendPQExpBuffer(query, "'t' as polpermissive, ");
+       appendPQExpBuffer(query,
+                                         "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
+                                         "   pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
+                                         "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
+                                         "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
+                                         "FROM pg_catalog.pg_policy pol");
 
-               if (ntups == 0)
-               {
-                       /*
-                        * No explicit policies to handle (only the default-deny policy,
-                        * which is handled as part of the table definition).  Clean up
-                        * and return.
-                        */
-                       PQclear(res);
-                       continue;
-               }
+       res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
+       ntups = PQntuples(res);
+       if (ntups > 0)
+       {
                i_oid = PQfnumber(res, "oid");
                i_tableoid = PQfnumber(res, "tableoid");
+               i_polrelid = PQfnumber(res, "polrelid");
                i_polname = PQfnumber(res, "polname");
                i_polcmd = PQfnumber(res, "polcmd");
                i_polpermissive = PQfnumber(res, "polpermissive");
@@ -3775,6 +3755,16 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
 
                for (j = 0; j < ntups; j++)
                {
+                       Oid                     polrelid = atooid(PQgetvalue(res, j, i_polrelid));
+                       TableInfo  *tbinfo = findTableByOid(polrelid);
+
+                       /*
+                        * Ignore row security on tables not to be dumped.  (This will
+                        * result in some harmless wasted slots in polinfo[].)
+                        */
+                       if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
+                               continue;
+
                        polinfo[j].dobj.objType = DO_POLICY;
                        polinfo[j].dobj.catId.tableoid =
                                atooid(PQgetvalue(res, j, i_tableoid));
@@ -3804,8 +3794,10 @@ getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
                                polinfo[j].polwithcheck
                                        = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
                }
-               PQclear(res);
        }
+
+       PQclear(res);
+
        destroyPQExpBuffer(query);
 }