willem | doe | w_doe | 3
(4 rows)
+-- Test partial execution of a set-returning function
+SELECT get_user_records2() LIMIT 2;
+ get_user_records2
+--------------------
+ (jane,doe,j_doe,1)
+ (john,doe,johnd,2)
+(2 rows)
+
+SELECT * FROM get_user_records2() LIMIT 2;
+ fname | lname | username | userid
+-------+-------+----------+--------
+ jane | doe | j_doe | 1
+ john | doe | johnd | 2
+(2 rows)
+
+-- A set-returning function that is invalidated mid-iteration must run to
+-- completion using its original definition (bug #19480).
+CREATE OR REPLACE FUNCTION self_invalidating_srf(x int) RETURNS SETOF int AS $$
+for i in range(3):
+ if i == 1:
+ plpy.execute("CREATE OR REPLACE FUNCTION self_invalidating_srf(x int) "
+ "RETURNS SETOF int LANGUAGE plpython3u AS 'return [-1]'")
+ yield x + i
+$$ LANGUAGE plpython3u;
+SELECT self_invalidating_srf(10); -- expect 10,11,12 (original definition)
+ self_invalidating_srf
+-----------------------
+ 10
+ 11
+ 12
+(3 rows)
+
+SELECT self_invalidating_srf(10); -- expect -1 (replacement now in effect)
+ self_invalidating_srf
+-----------------------
+ -1
+(1 row)
+
#include "postgres.h"
-#include "access/htup_details.h"
-#include "access/xact.h"
-#include "catalog/pg_type.h"
#include "commands/event_trigger.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "plpy_elog.h"
#include "plpy_exec.h"
#include "plpy_main.h"
-#include "plpy_procedure.h"
#include "plpy_subxactobject.h"
#include "plpy_util.h"
#include "utils/fmgrprotos.h"
-#include "utils/rel.h"
-
-/* saved state for a set-returning function */
-typedef struct PLySRFState
-{
- PyObject *iter; /* Python iterator producing results */
- PLySavedArgs *savedargs; /* function argument values */
- MemoryContextCallback callback; /* for releasing refcounts when done */
-} PLySRFState;
+static void ShutdownPLyFunction(Datum arg);
static PyObject *PLy_function_build_args(FunctionCallInfo fcinfo, PLyProcedure *proc);
static PLySavedArgs *PLy_function_save_args(PLyProcedure *proc);
static void PLy_function_restore_args(PLyProcedure *proc, PLySavedArgs *savedargs);
static void PLy_function_drop_args(PLySavedArgs *savedargs);
static void PLy_global_args_push(PLyProcedure *proc);
static void PLy_global_args_pop(PLyProcedure *proc);
-static void plpython_srf_cleanup_callback(void *arg);
static void plpython_return_error_callback(void *arg);
static PyObject *PLy_trigger_build_args(FunctionCallInfo fcinfo, PLyProcedure *proc,
/* function subhandler */
Datum
-PLy_exec_function(FunctionCallInfo fcinfo, PLyProcedure *proc)
+PLy_exec_function(FunctionCallInfo fcinfo, PLyProcedureCache *pcache)
{
+ PLyProcedure *proc = pcache->proc;
bool is_setof = proc->is_setof;
+ ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
Datum rv;
PyObject *volatile plargs = NULL;
PyObject *volatile plrv = NULL;
- FuncCallContext *volatile funcctx = NULL;
PLySRFState *volatile srfstate = NULL;
ErrorContextCallback plerrcontext;
{
if (is_setof)
{
- /* First Call setup */
- if (SRF_IS_FIRSTCALL())
+ /*
+ * PL/Python returns sets in ValuePerCall mode, so the handler is
+ * invoked once per result row. Across those calls we keep the
+ * iterator and saved arguments in the per-call-site cache
+ * (pcache->srfstate); a NULL srfstate means this is the first
+ * call of a new iteration, so we must set up that state now.
+ */
+ if (pcache->srfstate == NULL)
{
- funcctx = SRF_FIRSTCALL_INIT();
- srfstate = (PLySRFState *)
- MemoryContextAllocZero(funcctx->multi_call_memory_ctx,
- sizeof(PLySRFState));
- /* Immediately register cleanup callback */
- srfstate->callback.func = plpython_srf_cleanup_callback;
- srfstate->callback.arg = srfstate;
- MemoryContextRegisterResetCallback(funcctx->multi_call_memory_ctx,
- &srfstate->callback);
- funcctx->user_fctx = srfstate;
+ if (!rsi || !IsA(rsi, ReturnSetInfo) ||
+ (rsi->allowedModes & SFRM_ValuePerCall) == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unsupported set function return mode"),
+ errdetail("PL/Python set-returning functions only support returning one value per call.")));
+ }
+ rsi->returnMode = SFRM_ValuePerCall;
+
+ pcache->srfstate = (PLySRFState *)
+ MemoryContextAllocZero(pcache->fcontext, sizeof(PLySRFState));
+
+ /*
+ * Register a shutdown callback so that the iterator state is
+ * released if execution is abandoned before the iterator is
+ * exhausted. We'll unregister it again on normal completion.
+ */
+ RegisterExprContextCallback(rsi->econtext,
+ ShutdownPLyFunction,
+ PointerGetDatum(pcache));
+ pcache->shutdown_reg = true;
}
- /* Every call setup */
- funcctx = SRF_PERCALL_SETUP();
- Assert(funcctx != NULL);
- srfstate = (PLySRFState *) funcctx->user_fctx;
- Assert(srfstate != NULL);
+
+ srfstate = pcache->srfstate;
}
if (srfstate == NULL || srfstate->iter == NULL)
{
if (srfstate->iter == NULL)
{
- /* first time -- do checks and setup */
- ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-
- if (!rsi || !IsA(rsi, ReturnSetInfo) ||
- (rsi->allowedModes & SFRM_ValuePerCall) == 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("unsupported set function return mode"),
- errdetail("PL/Python set-returning functions only support returning one value per call.")));
- }
- rsi->returnMode = SFRM_ValuePerCall;
-
- /* Make iterator out of returned object */
+ /* First time -- make iterator out of returned object */
srfstate->iter = PyObject_GetIter(plrv);
Py_DECREF(plrv);
Py_XDECREF(plrv);
/*
- * If there was an error within a SRF, the iterator might not have
- * been exhausted yet. Clear it so the next invocation of the
- * function will start the iteration again. (This code is probably
- * unnecessary now; plpython_srf_cleanup_callback should take care of
- * cleanup. But it doesn't hurt anything to do it here.)
+ * If we are erroring out of a SRF, clean up its state immediately.
+ * ShutdownPLyFunction will not be called on abort, and the
+ * memory-context callback only fires when the FmgrInfo's context is
+ * torn down. Releasing the Python references promptly avoids leaking
+ * them if teardown is delayed, and clearing pcache->srfstate ensures
+ * that if we reuse the pcache we won't mistake this for an iteration
+ * still in progress.
*/
- if (srfstate)
- {
- Py_XDECREF(srfstate->iter);
- srfstate->iter = NULL;
- /* And drop any saved args; we won't need them */
- if (srfstate->savedargs)
- PLy_function_drop_args(srfstate->savedargs);
- srfstate->savedargs = NULL;
- }
+ PLy_function_cleanup_srfstate(pcache);
PG_RE_THROW();
}
if (srfstate)
{
- /* We're in a SRF, exit appropriately */
+ /* We're in a SRF, signal done-or-not via rsi->isDone */
if (srfstate->iter == NULL)
{
- /* Iterator exhausted, so we're done */
- SRF_RETURN_DONE(funcctx);
+ /*
+ * Iterator exhausted. Unregister the shutdown callback since
+ * we're done normally, then clean up srfstate. (srfstate->iter
+ * is already NULL here, so the cleanup just frees the struct.)
+ */
+ if (pcache->shutdown_reg)
+ {
+ UnregisterExprContextCallback(rsi->econtext,
+ ShutdownPLyFunction,
+ PointerGetDatum(pcache));
+ pcache->shutdown_reg = false;
+ }
+ PLy_function_cleanup_srfstate(pcache);
+
+ rsi->isDone = ExprEndResult;
+ fcinfo->isnull = true;
+ return (Datum) 0;
}
- else if (fcinfo->isnull)
- SRF_RETURN_NEXT_NULL(funcctx);
else
- SRF_RETURN_NEXT(funcctx, rv);
+ {
+ rsi->isDone = ExprMultipleResult;
+ return rv;
+ }
}
/* Plain function, just return the Datum value (possibly null) */
return rv;
}
+/*
+ * ExprContext shutdown callback, invoked when the expression context that
+ * ran a SRF is rescanned or freed at end of query. This handles in-query
+ * cancellation, e.g. a LIMIT that stops fetching before the iterator is
+ * exhausted, or a rescan of the owning plan node.
+ *
+ * NB: this is not called during an error abort (see comments for
+ * PLy_function_cleanup_srfstate).
+ */
+static void
+ShutdownPLyFunction(Datum arg)
+{
+ PLyProcedureCache *pcache = (PLyProcedureCache *) DatumGetPointer(arg);
+
+ /* execUtils.c will deregister the callback after we return */
+ pcache->shutdown_reg = false;
+
+ PLy_function_cleanup_srfstate(pcache);
+}
+
+/*
+ * Release the Python references held by an in-progress set-returning
+ * function, and free the SRF state. This is a no-op if there is no active
+ * SRF state, so it's safe to call more than once.
+ *
+ * The Python iterator and the saved argument values own reference counts on
+ * Python objects, which are not released by transaction abort the way SQL
+ * resources are. We must therefore make sure this runs in every exit path.
+ * There are four ways for a set-returning function to terminate:
+ * 1. Normal completion of the iterator. Then this is called from
+ * PLy_exec_function's normal exit path.
+ * 2. Error thrown from within execution of the SRF. Then this is called
+ * from PLy_exec_function's PG_CATCH stanza.
+ * 3. Early termination of the calling query, for example due to LIMIT,
+ * or to a rescan of the calling plan node. Then this is called via the
+ * ExprContext shutdown callback ShutdownPLyFunction.
+ * 4. Error thrown from elsewhere in the query. Then this is called during
+ * (sub)transaction abort via the memory-context reset callback
+ * RemovePLyProcedureCache.
+ * (Some code paths hit more than one of these calls, which is why this
+ * must tolerate the cleanup having been done already.)
+ *
+ * This argument presumes that the FmgrInfo the SRF is called from is in a
+ * memory context that will be cleaned up by query abort. Postgres does use
+ * some longer-lived FmgrInfos, for instance those in relcache and typcache
+ * entries. But we never call SRFs via those.
+ */
+void
+PLy_function_cleanup_srfstate(PLyProcedureCache *pcache)
+{
+ PLySRFState *srfstate = pcache->srfstate;
+
+ if (srfstate != NULL)
+ {
+ /* Release refcount on the iter, if we still have one */
+ Py_XDECREF(srfstate->iter);
+ srfstate->iter = NULL;
+
+ /* And drop any saved args; we won't need them */
+ if (srfstate->savedargs)
+ PLy_function_drop_args(srfstate->savedargs);
+ srfstate->savedargs = NULL;
+
+ pfree(srfstate);
+ pcache->srfstate = NULL;
+ }
+}
+
/*
* trigger subhandler
*
}
}
-/*
- * Memory context deletion callback for cleaning up a PLySRFState.
- * We need this in case execution of the SRF is terminated early,
- * due to error or the caller simply not running it to completion.
- */
-static void
-plpython_srf_cleanup_callback(void *arg)
-{
- PLySRFState *srfstate = (PLySRFState *) arg;
-
- /* Release refcount on the iter, if we still have one */
- Py_XDECREF(srfstate->iter);
- srfstate->iter = NULL;
- /* And drop any saved args; we won't need them */
- if (srfstate->savedargs)
- PLy_function_drop_args(srfstate->savedargs);
- srfstate->savedargs = NULL;
-}
-
static void
plpython_return_error_callback(void *arg)
{
#include "plpy_procedure.h"
-extern Datum PLy_exec_function(FunctionCallInfo fcinfo, PLyProcedure *proc);
+extern Datum PLy_exec_function(FunctionCallInfo fcinfo, PLyProcedureCache *pcache);
extern HeapTuple PLy_exec_trigger(FunctionCallInfo fcinfo, PLyProcedure *proc);
extern void PLy_exec_event_trigger(FunctionCallInfo fcinfo, PLyProcedure *proc);
+extern void PLy_function_cleanup_srfstate(PLyProcedureCache *pcache);
#endif /* PLPY_EXEC_H */
#include "postgres.h"
-#include "access/htup_details.h"
#include "catalog/pg_proc.h"
-#include "catalog/pg_type.h"
#include "commands/event_trigger.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "plpy_exec.h"
#include "plpy_main.h"
#include "plpy_plpymodule.h"
-#include "plpy_procedure.h"
#include "plpy_subxactobject.h"
#include "plpy_util.h"
#include "utils/guc.h"
#include "utils/memutils.h"
-#include "utils/rel.h"
#include "utils/syscache.h"
/*
Py_DECREF(main_mod);
- init_procedure_caches();
-
explicit_subtransactions = NIL;
PLy_execution_contexts = NULL;
Datum
plpython3_validator(PG_FUNCTION_ARGS)
{
+ LOCAL_FCINFO(fake_fcinfo, 0);
Oid funcoid = PG_GETARG_OID(0);
HeapTuple tuple;
Form_pg_proc procStruct;
PLyTrigType is_trigger;
+ TriggerData trigdata;
+ EventTriggerData etrigdata;
+ FmgrInfo flinfo;
+ PLyProcedureCache *pcache;
if (!CheckFunctionValidatorAccess(fcinfo->flinfo->fn_oid, funcoid))
PG_RETURN_VOID();
ReleaseSysCache(tuple);
- /* We can't validate triggers against any particular table ... */
- (void) PLy_procedure_get(funcoid, InvalidOid, is_trigger);
+ /*
+ * Set up a fake flinfo/fcinfo with just enough info to satisfy
+ * PLy_procedure_get(). That function derives the call context (plain
+ * function, DML trigger, or event trigger) from the fcinfo, so we have to
+ * construct matching context here.
+ */
+ MemSet(fake_fcinfo, 0, SizeForFunctionCallInfo(0));
+ MemSet(&flinfo, 0, sizeof(flinfo));
+ fake_fcinfo->flinfo = &flinfo;
+ flinfo.fn_oid = funcoid;
+ flinfo.fn_mcxt = CurrentMemoryContext;
+
+ if (is_trigger == PLPY_TRIGGER)
+ {
+ MemSet(&trigdata, 0, sizeof(trigdata));
+ trigdata.type = T_TriggerData;
+ /* We can't validate triggers against any particular table ... */
+ fake_fcinfo->context = (Node *) &trigdata;
+ }
+ else if (is_trigger == PLPY_EVENT_TRIGGER)
+ {
+ MemSet(&etrigdata, 0, sizeof(etrigdata));
+ etrigdata.type = T_EventTriggerData;
+ fake_fcinfo->context = (Node *) &etrigdata;
+ }
+
+ pcache = PLy_procedure_get(fake_fcinfo, true);
+
+ /*
+ * Release the reference count that PLy_procedure_get acquired; the
+ * PLyProcedure object remains valid for possible future use. (We could
+ * leave this to be done when the calling memory context is cleaned up,
+ * but it seems neater to do it right away. Note we mustn't release the
+ * pcache object, since the memory-context reset callback has a reference
+ * to it.)
+ */
+ Assert(pcache->proc->cfunc.use_count > 0);
+ pcache->proc->cfunc.use_count--;
+ pcache->proc = NULL;
PG_RETURN_VOID();
}
PG_TRY();
{
- Oid funcoid = fcinfo->flinfo->fn_oid;
- PLyProcedure *proc;
+ PLyProcedureCache *pcache;
/*
* Setup error traceback support for ereport(). Note that the PG_TRY
plerrcontext.previous = error_context_stack;
error_context_stack = &plerrcontext;
+ /*
+ * Look up (and if necessary compile) the procedure. This can throw
+ * an error, so it must happen inside the PG_TRY so that the execution
+ * context gets popped on the way out.
+ */
+ pcache = PLy_procedure_get(fcinfo, false);
+ exec_ctx->curr_proc = pcache->proc;
+
if (CALLED_AS_TRIGGER(fcinfo))
{
- Relation tgrel = ((TriggerData *) fcinfo->context)->tg_relation;
HeapTuple trv;
- proc = PLy_procedure_get(funcoid, RelationGetRelid(tgrel), PLPY_TRIGGER);
- exec_ctx->curr_proc = proc;
- trv = PLy_exec_trigger(fcinfo, proc);
+ trv = PLy_exec_trigger(fcinfo, pcache->proc);
retval = PointerGetDatum(trv);
}
else if (CALLED_AS_EVENT_TRIGGER(fcinfo))
{
- proc = PLy_procedure_get(funcoid, InvalidOid, PLPY_EVENT_TRIGGER);
- exec_ctx->curr_proc = proc;
- PLy_exec_event_trigger(fcinfo, proc);
+ PLy_exec_event_trigger(fcinfo, pcache->proc);
retval = (Datum) 0;
}
else
- {
- proc = PLy_procedure_get(funcoid, InvalidOid, PLPY_NOT_TRIGGER);
- exec_ctx->curr_proc = proc;
- retval = PLy_exec_function(fcinfo, proc);
- }
+ retval = PLy_exec_function(fcinfo, pcache);
}
PG_CATCH();
{
+ /* Destroy the execution context */
PLy_pop_execution_context();
PyErr_Clear();
+
PG_RE_THROW();
}
PG_END_TRY();
InlineCodeBlock *codeblock = (InlineCodeBlock *) DatumGetPointer(PG_GETARG_DATUM(0));
FmgrInfo flinfo;
PLyProcedure proc;
+ PLyProcedureCache pcache;
PLyExecutionContext *exec_ctx;
ErrorContextCallback plerrcontext;
*/
proc.result.typoid = VOIDOID;
+ /* Set up a minimal PLyProcedureCache for the inline block */
+ MemSet(&pcache, 0, sizeof(PLyProcedureCache));
+ pcache.proc = &proc;
+ pcache.fcontext = CurrentMemoryContext;
+
/*
* Push execution context onto stack. It is important that this get
* popped again, so avoid putting anything that could throw error between
PLy_procedure_compile(&proc, codeblock->source_text);
exec_ctx->curr_proc = &proc;
- PLy_exec_function(fake_fcinfo, &proc);
+ PLy_exec_function(fake_fcinfo, &pcache);
}
PG_CATCH();
{
PG_RETURN_VOID();
}
+/*
+ * Determine whether a function is a (DML or event) trigger from its pg_proc
+ * result type. This is used by the validator, which has no call context to
+ * inspect; the call handler instead relies on the fcinfo's call context.
+ */
static PLyTrigType
PLy_procedure_is_trigger(Form_pg_proc procStruct)
{
#include "postgres.h"
-#include "access/htup_details.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
+#include "commands/event_trigger.h"
+#include "commands/trigger.h"
#include "funcapi.h"
#include "plpy_elog.h"
+#include "plpy_exec.h"
#include "plpy_main.h"
#include "plpy_procedure.h"
#include "plpy_util.h"
#include "utils/builtins.h"
-#include "utils/hsearch.h"
-#include "utils/memutils.h"
+#include "utils/funccache.h"
#include "utils/syscache.h"
-static HTAB *PLy_procedure_cache = NULL;
-
-static PLyProcedure *PLy_procedure_create(HeapTuple procTup, Oid fn_oid, PLyTrigType is_trigger);
-static bool PLy_procedure_valid(PLyProcedure *proc, HeapTuple procTup);
+static void PLy_procedure_create(PLyProcedure *proc,
+ HeapTuple procTup,
+ Oid fn_oid,
+ PLyTrigType is_trigger);
static char *PLy_procedure_munge_source(const char *name, const char *src);
+static void PLy_compile_callback(FunctionCallInfo fcinfo,
+ HeapTuple procTup,
+ const CachedFunctionHashKey *hashkey,
+ CachedFunction *cfunc,
+ bool forValidator);
+static void PLy_delete_callback(CachedFunction *cfunc);
+static void RemovePLyProcedureCache(void *arg);
-void
-init_procedure_caches(void)
-{
- HASHCTL hash_ctl;
-
- hash_ctl.keysize = sizeof(PLyProcedureKey);
- hash_ctl.entrysize = sizeof(PLyProcedureEntry);
- PLy_procedure_cache = hash_create("PL/Python procedures", 32, &hash_ctl,
- HASH_ELEM | HASH_BLOBS);
-}
-
/*
* PLy_procedure_name: get the name of the specified procedure.
*
}
/*
- * PLy_procedure_get: returns a cached PLyProcedure, or creates, stores and
- * returns a new PLyProcedure.
+ * PLy_procedure_get: returns a PLyProcedureCache struct for the function,
+ * making it valid if necessary.
*
- * fn_oid is the OID of the function requested
- * fn_rel is InvalidOid or the relation this function triggers on
- * is_trigger denotes whether the function is a trigger function
+ * The PLyProcedureCache contains a pointer to the long-lived PLyProcedure
+ * (managed by funccache.c) and execution-specific state like SRF state.
*
- * The reason that both fn_rel and is_trigger need to be passed is that when
- * trigger functions get validated we don't know which relation(s) they'll
- * be used with, so no sensible fn_rel can be passed. Also, in that case
- * we can't make a cache entry because we can't construct the right cache key.
- * To forestall leakage of the PLyProcedure in such cases, delete it after
- * construction and return NULL. That's okay because the only caller that
- * would pass that set of values is plpython3_validator, which ignores our
- * result anyway.
+ * For SRFs, if we are resuming execution (srfstate->iter != NULL), we skip
+ * revalidation and continue using the same PLyProcedure to ensure consistent
+ * behavior throughout the SRF execution.
*/
-PLyProcedure *
-PLy_procedure_get(Oid fn_oid, Oid fn_rel, PLyTrigType is_trigger)
+PLyProcedureCache *
+PLy_procedure_get(FunctionCallInfo fcinfo, bool forValidator)
{
- bool use_cache;
- HeapTuple procTup;
- PLyProcedureKey key;
- PLyProcedureEntry *volatile entry = NULL;
- PLyProcedure *volatile proc = NULL;
- bool found = false;
-
- if (is_trigger == PLPY_TRIGGER && fn_rel == InvalidOid)
- use_cache = false;
- else
- use_cache = true;
+ FmgrInfo *finfo = fcinfo->flinfo;
+ PLyProcedureCache *pcache;
+ PLyProcedure *proc;
+
+ /*
+ * If this is the first execution for this FmgrInfo, set up a cache struct
+ * (initially containing null pointers). The cache must live as long as
+ * the FmgrInfo, so it goes in fn_mcxt. Also set up a memory context
+ * callback that will be invoked when fn_mcxt is reset/deleted.
+ */
+ pcache = finfo->fn_extra;
+ if (pcache == NULL)
+ {
+ pcache = (PLyProcedureCache *)
+ MemoryContextAllocZero(finfo->fn_mcxt, sizeof(PLyProcedureCache));
+
+ pcache->fcontext = finfo->fn_mcxt;
+ pcache->mcb.func = RemovePLyProcedureCache;
+ pcache->mcb.arg = pcache;
- procTup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fn_oid));
- if (!HeapTupleIsValid(procTup))
- elog(ERROR, "cache lookup failed for function %u", fn_oid);
+ MemoryContextRegisterResetCallback(finfo->fn_mcxt, &pcache->mcb);
+
+ finfo->fn_extra = pcache;
+ }
/*
- * Look for the function in the cache, unless we don't have the necessary
- * information (e.g. during validation). In that case we just don't cache
- * anything.
+ * If we are resuming execution of a set-returning function, just keep
+ * using the same cache. We do not ask funccache.c to re-validate the
+ * PLyProcedure: we want to run to completion using the function's initial
+ * definition.
+ *
+ * A live iterator (srfstate->iter != NULL) reliably means a genuine
+ * resume: when an iteration ends for any reason, srfstate->iter is reset
+ * to NULL (see comments for PLy_function_cleanup_srfstate).
*/
- if (use_cache)
+ if (pcache->srfstate != NULL && pcache->srfstate->iter != NULL)
{
- key.fn_oid = fn_oid;
- key.fn_rel = fn_rel;
- entry = hash_search(PLy_procedure_cache, &key, HASH_ENTER, &found);
- proc = entry->proc;
+ Assert(pcache->proc != NULL);
+ return pcache;
}
- PG_TRY();
+ /*
+ * Look up, or re-validate, the long-lived hash entry. Like SQL-language
+ * functions, make the hash key depend on the result of
+ * get_call_result_type() when that's composite, so that we can safely
+ * assume that we'll build a new hash entry if the composite rowtype
+ * changes.
+ */
+ proc = (PLyProcedure *)
+ cached_function_compile(fcinfo,
+ (CachedFunction *) pcache->proc,
+ PLy_compile_callback,
+ PLy_delete_callback,
+ sizeof(PLyProcedure),
+ true,
+ forValidator);
+
+ /*
+ * Install the hash pointer in the PLyProcedureCache, and increment its
+ * use count to reflect that. If cached_function_compile gave us back a
+ * different hash entry than we were using before, we must decrement that
+ * one's use count.
+ */
+ if (proc != pcache->proc)
{
- if (!found)
+ if (pcache->proc != NULL)
{
- /* Haven't found it, create a new procedure */
- proc = PLy_procedure_create(procTup, fn_oid, is_trigger);
- if (use_cache)
- entry->proc = proc;
- else
- {
- /* Delete the proc, otherwise it's a memory leak */
- PLy_procedure_delete(proc);
- proc = NULL;
- }
+ Assert(pcache->proc->cfunc.use_count > 0);
+ pcache->proc->cfunc.use_count--;
}
- else if (!PLy_procedure_valid(proc, procTup))
- {
- /* Found it, but it's invalid, free and reuse the cache entry */
- entry->proc = NULL;
- if (proc)
- PLy_procedure_delete(proc);
- proc = PLy_procedure_create(procTup, fn_oid, is_trigger);
- entry->proc = proc;
- }
- /* Found it and it's valid, it's fine to use it */
+ pcache->proc = proc;
+ proc->cfunc.use_count++;
}
- PG_CATCH();
- {
- /* Do not leave an uninitialized entry in the cache */
- if (use_cache)
- hash_search(PLy_procedure_cache, &key, HASH_REMOVE, NULL);
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- ReleaseSysCache(procTup);
- return proc;
+ return pcache;
}
/*
- * Create a new PLyProcedure structure
+ * Create (well, fill in) a new PLyProcedure structure
*/
-static PLyProcedure *
-PLy_procedure_create(HeapTuple procTup, Oid fn_oid, PLyTrigType is_trigger)
+static void
+PLy_procedure_create(PLyProcedure *proc,
+ HeapTuple procTup,
+ Oid fn_oid,
+ PLyTrigType is_trigger)
{
char procName[NAMEDATALEN + 256];
Form_pg_proc procStruct;
- PLyProcedure *volatile proc;
MemoryContext cxt;
MemoryContext oldcxt;
int rv;
oldcxt = MemoryContextSwitchTo(cxt);
- proc = palloc0_object(PLyProcedure);
proc->mcxt = cxt;
PG_TRY();
proc->proname = pstrdup(NameStr(procStruct->proname));
MemoryContextSetIdentifier(cxt, proc->proname);
proc->pyname = pstrdup(procName);
- proc->fn_xmin = HeapTupleHeaderGetRawXmin(procTup->t_data);
- proc->fn_tid = procTup->t_self;
proc->fn_readonly = (procStruct->provolatile != PROVOLATILE_VOLATILE);
proc->is_setof = procStruct->proretset;
proc->is_procedure = (procStruct->prokind == PROKIND_PROCEDURE);
PG_END_TRY();
MemoryContextSwitchTo(oldcxt);
- return proc;
}
/*
MemoryContextDelete(proc->mcxt);
}
-/*
- * Decide whether a cached PLyProcedure struct is still valid
- */
-static bool
-PLy_procedure_valid(PLyProcedure *proc, HeapTuple procTup)
-{
- if (proc == NULL)
- return false;
-
- /* If the pg_proc tuple has changed, it's not valid */
- if (!(proc->fn_xmin == HeapTupleHeaderGetRawXmin(procTup->t_data) &&
- ItemPointerEquals(&proc->fn_tid, &procTup->t_self)))
- return false;
-
- return true;
-}
-
static char *
PLy_procedure_munge_source(const char *name, const char *src)
{
return mrc;
}
+
+/*
+ * Compile callback for funccache.c.
+ *
+ * cached_function_compile() calls this when it needs to (re)compile the
+ * long-lived PLyProcedure for a function. The CachedFunction handed to us is
+ * pre-zeroed workspace of size sizeof(PLyProcedure); we just have to fill in
+ * the PL/Python-specific fields.
+ */
+static void
+PLy_compile_callback(FunctionCallInfo fcinfo,
+ HeapTuple procTup,
+ const CachedFunctionHashKey *hashkey,
+ CachedFunction *cfunc,
+ bool forValidator)
+{
+ PLyProcedure *proc = (PLyProcedure *) cfunc;
+ Oid fn_oid = fcinfo->flinfo->fn_oid;
+ PLyTrigType is_trigger;
+
+ /*
+ * Derive the trigger type from the call context, matching what
+ * plpython3_call_handler dispatches on.
+ */
+ if (CALLED_AS_TRIGGER(fcinfo))
+ is_trigger = PLPY_TRIGGER;
+ else if (CALLED_AS_EVENT_TRIGGER(fcinfo))
+ is_trigger = PLPY_EVENT_TRIGGER;
+ else
+ is_trigger = PLPY_NOT_TRIGGER;
+
+ PLy_procedure_create(proc, procTup, fn_oid, is_trigger);
+}
+
+/*
+ * Deletion callback for funccache.c.
+ *
+ * cached_function_compile() calls this when it discards a cache entry, which
+ * only happens once the entry's use count has dropped to zero. We must free
+ * the subsidiary data but not the CachedFunction struct itself.
+ */
+static void
+PLy_delete_callback(CachedFunction *cfunc)
+{
+ PLyProcedure *proc = (PLyProcedure *) cfunc;
+
+ Assert(proc->cfunc.use_count == 0);
+ Assert(proc->calldepth == 0);
+
+ PLy_procedure_delete(proc);
+}
+
+/*
+ * MemoryContext callback function
+ *
+ * We register this in the memory context that contains a PLyProcedureCache
+ * struct (that is, the FmgrInfo's fn_mcxt). When the memory context is reset
+ * or deleted, we release the reference count (if any) that the cache holds on
+ * the long-lived hash entry. Note that this will happen even during error
+ * aborts.
+ *
+ * This is also our opportunity to release the Python references held by an
+ * interrupted set-returning function. ShutdownPLyFunction() handles that for
+ * routine in-query cancellation cases, but it does not run during an error
+ * abort; this callback does, so it is the backstop that prevents leaking the
+ * SRF's iterator and saved arguments when a query errors out mid-iteration.
+ */
+static void
+RemovePLyProcedureCache(void *arg)
+{
+ PLyProcedureCache *pcache = (PLyProcedureCache *) arg;
+
+ /* Release any Python state left behind by an interrupted SRF */
+ PLy_function_cleanup_srfstate(pcache);
+
+ /* Release reference count on PLyProcedure */
+ if (pcache->proc != NULL)
+ {
+ Assert(pcache->proc->cfunc.use_count > 0);
+ pcache->proc->cfunc.use_count--;
+ pcache->proc = NULL;
+ }
+
+ /* We needn't free the pcache object itself, context cleanup does that */
+}
#define PLPY_PROCEDURE_H
#include "plpy_typeio.h"
-
-
-extern void init_procedure_caches(void);
+#include "utils/funccache.h"
/*
PyObject *namedargs[FLEXIBLE_ARRAY_MEMBER]; /* named args */
} PLySavedArgs;
-/* cached procedure data */
+/* saved state for a set-returning function */
+typedef struct PLySRFState
+{
+ PyObject *iter; /* Python iterator producing results */
+ PLySavedArgs *savedargs; /* function argument values */
+} PLySRFState;
+
+/*
+ * Long-lived data for a PL/Python function.
+ *
+ * This struct is managed by funccache.c and can be shared across multiple
+ * executions of the same function. It must contain no execution-specific
+ * state. The CachedFunction struct must be first.
+ */
typedef struct PLyProcedure
{
+ CachedFunction cfunc; /* fields managed by funccache.c */
+
MemoryContext mcxt; /* context holding this PLyProcedure and its
* subsidiary data */
char *proname; /* SQL name of procedure */
char *pyname; /* Python name of procedure */
- TransactionId fn_xmin;
- ItemPointerData fn_tid;
bool fn_readonly;
bool is_setof; /* true, if function returns result set */
bool is_procedure;
PLySavedArgs *argstack; /* stack of outer-level call arguments */
} PLyProcedure;
-/* the procedure cache key */
-typedef struct PLyProcedureKey
+/*
+ * Per-call-site cache for a PL/Python function.
+ *
+ * This struct is stored in fn_extra and holds execution-specific state,
+ * including a pointer to the long-lived PLyProcedure. The use_count in
+ * the PLyProcedure is incremented while we hold a reference.
+ */
+typedef struct PLyProcedureCache
{
- Oid fn_oid; /* function OID */
- Oid fn_rel; /* triggered-on relation or InvalidOid */
-} PLyProcedureKey;
+ PLyProcedure *proc; /* long-lived hash entry */
+ MemoryContext fcontext; /* fn_mcxt - context holding this struct */
+ PLySRFState *srfstate; /* SRF execution state, NULL if not in SRF */
+ bool shutdown_reg; /* true if registered shutdown callback */
-/* the procedure cache entry */
-typedef struct PLyProcedureEntry
-{
- PLyProcedureKey key; /* hash key */
- PLyProcedure *proc;
-} PLyProcedureEntry;
+ /* Callback to release resources when fcontext is reset or deleted */
+ MemoryContextCallback mcb;
+} PLyProcedureCache;
/* PLyProcedure manipulation */
extern char *PLy_procedure_name(PLyProcedure *proc);
-extern PLyProcedure *PLy_procedure_get(Oid fn_oid, Oid fn_rel, PLyTrigType is_trigger);
+extern PLyProcedureCache *PLy_procedure_get(FunctionCallInfo fcinfo, bool forValidator);
extern void PLy_procedure_compile(PLyProcedure *proc, const char *src);
extern void PLy_procedure_delete(PLyProcedure *proc);
SELECT get_user_records2();
SELECT * FROM get_user_records2();
+
+-- Test partial execution of a set-returning function
+SELECT get_user_records2() LIMIT 2;
+SELECT * FROM get_user_records2() LIMIT 2;
+
+-- A set-returning function that is invalidated mid-iteration must run to
+-- completion using its original definition (bug #19480).
+CREATE OR REPLACE FUNCTION self_invalidating_srf(x int) RETURNS SETOF int AS $$
+for i in range(3):
+ if i == 1:
+ plpy.execute("CREATE OR REPLACE FUNCTION self_invalidating_srf(x int) "
+ "RETURNS SETOF int LANGUAGE plpython3u AS 'return [-1]'")
+ yield x + i
+$$ LANGUAGE plpython3u;
+
+SELECT self_invalidating_srf(10); -- expect 10,11,12 (original definition)
+SELECT self_invalidating_srf(10); -- expect -1 (replacement now in effect)
PLyObject_AsString_t
PLyPlanObject
PLyProcedure
-PLyProcedureEntry
-PLyProcedureKey
+PLyProcedureCache
PLyResultObject
PLySRFState
PLySavedArgs