#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
Form_pg_subscription subform;
Datum datum;
bool isnull;
+ MemoryContext cxt;
+ MemoryContext oldcxt;
tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
elog(ERROR, "cache lookup failed for subscription %u", subid);
}
+ cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
+ ALLOCSET_SMALL_SIZES);
+ oldcxt = MemoryContextSwitchTo(cxt);
+
subform = (Form_pg_subscription) GETSTRUCT(tup);
sub = palloc_object(Subscription);
+ sub->cxt = cxt;
sub->oid = subid;
sub->dbid = subform->subdbid;
sub->skiplsn = subform->subskiplsn;
ReleaseSysCache(tup);
+ MemoryContextSwitchTo(oldcxt);
+
return sub;
}
return nsubs;
}
-/*
- * Free memory allocated by subscription struct.
- */
-void
-FreeSubscription(Subscription *sub)
-{
- pfree(sub->name);
- pfree(sub->conninfo);
- if (sub->slotname)
- pfree(sub->slotname);
- list_free_deep(sub->publications);
- pfree(sub);
-}
-
/*
* Disable the given subscription.
*/
/*
* Retrieve connection string from server's FDW.
+ *
+ * NB: leaks into CurrentMemoryContext.
*/
char *
ForeignServerConnectionString(Oid userid, Oid serverid)
{
- MemoryContext tempContext;
- MemoryContext oldcxt;
- text *volatile connection_text = NULL;
- char *result = NULL;
-
- /*
- * GetForeignServer, GetForeignDataWrapper, and the connection function
- * itself all leak memory into CurrentMemoryContext. Switch to a temporary
- * context for easy cleanup.
- */
- tempContext = AllocSetContextCreate(CurrentMemoryContext,
- "FDWConnectionContext",
- ALLOCSET_SMALL_SIZES);
-
- oldcxt = MemoryContextSwitchTo(tempContext);
-
- PG_TRY();
- {
- ForeignServer *server;
- ForeignDataWrapper *fdw;
- Datum connection_datum;
-
- server = GetForeignServer(serverid);
- fdw = GetForeignDataWrapper(server->fdwid);
-
- if (!OidIsValid(fdw->fdwconnection))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("foreign data wrapper \"%s\" does not support subscription connections",
- fdw->fdwname),
- errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
-
-
- connection_datum = OidFunctionCall3(fdw->fdwconnection,
- ObjectIdGetDatum(userid),
- ObjectIdGetDatum(serverid),
- PointerGetDatum(NULL));
+ ForeignServer *server;
+ ForeignDataWrapper *fdw;
+ Datum connection_datum;
- connection_text = DatumGetTextPP(connection_datum);
- }
- PG_FINALLY();
- {
- MemoryContextSwitchTo(oldcxt);
+ server = GetForeignServer(serverid);
+ fdw = GetForeignDataWrapper(server->fdwid);
- if (connection_text)
- result = text_to_cstring((text *) connection_text);
+ if (!OidIsValid(fdw->fdwconnection))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("foreign data wrapper \"%s\" does not support subscription connections",
+ fdw->fdwname),
+ errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
- MemoryContextDelete(tempContext);
- }
- PG_END_TRY();
+ connection_datum = OidFunctionCall3(fdw->fdwconnection,
+ ObjectIdGetDatum(userid),
+ ObjectIdGetDatum(serverid),
+ PointerGetDatum(NULL));
- return result;
+ return text_to_cstring(DatumGetTextPP(connection_datum));
}
void
maybe_reread_subscription(void)
{
- MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
started_tx = true;
}
- /* Ensure allocations in permanent context. */
- oldctx = MemoryContextSwitchTo(ApplyContext);
-
newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
- /*
- * Exit if the subscription was removed. This normally should not happen
- * as the worker gets killed during DROP SUBSCRIPTION.
- */
- if (!newsub)
+ if (newsub)
+ {
+ MemoryContextSetParent(newsub->cxt, ApplyContext);
+ }
+ else
{
+ /*
+ * Exit if the subscription was removed. This normally should not
+ * happen as the worker gets killed during DROP SUBSCRIPTION.
+ */
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
MySubscription->name)));
}
/* Clean old subscription info and switch to new one. */
- FreeSubscription(MySubscription);
+ MemoryContextDelete(MySubscription->cxt);
MySubscription = newsub;
- MemoryContextSwitchTo(oldctx);
-
/* Change synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
void
InitializeLogRepWorker(void)
{
- MemoryContext oldctx;
-
/* Run as replica session replication role. */
SetConfigOption("session_replication_role", "replica",
PGC_SUSET, PGC_S_OVERRIDE);
*/
SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
- /* Load the subscription into persistent memory context. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
"ApplyContext",
ALLOCSET_DEFAULT_SIZES);
+
StartTransactionCommand();
- oldctx = MemoryContextSwitchTo(ApplyContext);
/*
* Lock the subscription to prevent it from being concurrently dropped,
*/
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
+
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
- if (!MySubscription)
+
+ if (MySubscription)
+ {
+ MemoryContextSetParent(MySubscription->cxt, ApplyContext);
+ }
+ else
{
ereport(LOG,
(errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
}
MySubscriptionValid = true;
- MemoryContextSwitchTo(oldctx);
if (!MySubscription->enabled)
{
typedef struct Subscription
{
+ MemoryContext cxt; /* mem cxt containing this subscription */
+
Oid oid; /* Oid of the subscription */
Oid dbid; /* Oid of the database which subscription is
* in */
extern Subscription *GetSubscription(Oid subid, bool missing_ok,
bool aclcheck);
-extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);
extern int CountDBSubscriptions(Oid dbid);