]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Arrays are toastable. (At least if you initdb, which I didn't force.)
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 22 Jul 2000 03:34:43 +0000 (03:34 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 22 Jul 2000 03:34:43 +0000 (03:34 +0000)
Remove a bunch of crufty code for large-object-based arrays, which is
superseded by TOAST and likely hasn't worked in a long time anyway.
Clean up array code a little, and in particular eliminate its habit
of scribbling on the input array (ie, modifying the input tuple :-().

contrib/array/array_iterator.c
src/backend/commands/define.c
src/backend/executor/execQual.c
src/backend/utils/adt/Makefile
src/backend/utils/adt/arrayfuncs.c
src/backend/utils/adt/arrayutils.c
src/backend/utils/adt/chunk.c [deleted file]
src/include/catalog/pg_type.h
src/include/utils/array.h

index 447d5c6443eddd7b6616f2289937bda81c9840b5..c480f7dfc70982ddb95c4db4125cd1043bc38a8b 100644 (file)
@@ -47,15 +47,18 @@ array_iterator(Oid elemtype, Oid proc, int and, ArrayType *array, Datum value)
        FmgrInfo        finfo;
 
        /* Sanity checks */
-       if ((array == (ArrayType *) NULL)
-               || (ARR_IS_LO(array) == true))
+       if (array == (ArrayType *) NULL)
        {
                /* elog(NOTICE, "array_iterator: array is null"); */
                return (0);
        }
+
+       /* detoast input if necessary */
+       array = DatumGetArrayTypeP(PointerGetDatum(array));
+
        ndim = ARR_NDIM(array);
        dim = ARR_DIMS(array);
-       nitems = getNitems(ndim, dim);
+       nitems = ArrayGetNItems(ndim, dim);
        if (nitems == 0)
        {
                /* elog(NOTICE, "array_iterator: nitems = 0"); */
index b90ef61a3b0dc49f1bcd4a02ccd9b8f80ae409e9..cf31e5edb1ff199fbce4525069d14941cda40edf 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.45 2000/07/17 03:04:44 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/define.c,v 1.46 2000/07/22 03:34:26 tgl Exp $
  *
  * DESCRIPTION
  *       The "DefineFoo" routines take the parse tree and pick out the
@@ -691,10 +691,10 @@ DefineType(char *typeName, List *parameters)
                           "array_in",          /* receive procedure */
                           "array_out",         /* send procedure */
                           typeName,            /* element type name */
-                          defaultValue,        /* default type value */
+                          NULL,                        /* never a default type value */
                           false,                       /* never passed by value */
-                          alignment,
-                          'p');                        /* ARRAY doesn't support TOAST yet */
+                          alignment,           /* NB: must be 'i' or 'd' for arrays... */
+                          'x');                        /* ARRAY is always toastable */
 
        pfree(shadow_type);
 }
index 48cd8aa169ce44db8f69c60471b83144837a77c5..1aeb07a7a9ffda1f4620cf408e11dbce69526f6c 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/executor/execQual.c,v 1.74 2000/07/17 03:04:51 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/executor/execQual.c,v 1.75 2000/07/22 03:34:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -67,9 +67,15 @@ static Datum ExecMakeFunctionResult(Node *node, List *arguments,
 /*
  *       ExecEvalArrayRef
  *
- *        This function takes an ArrayRef and returns a Const Node if it
- *        is an array reference or returns the changed Array Node if it is
- *                an array assignment.
+ *        This function takes an ArrayRef and returns the extracted Datum
+ *        if it's a simple reference, or the modified array value if it's
+ *        an array assignment (read array element insertion).
+ *
+ * NOTE: we deliberately refrain from applying DatumGetArrayTypeP() here,
+ * even though that might seem natural, because this code needs to support
+ * both varlena arrays and fixed-length array types.  DatumGetArrayTypeP()
+ * only works for the varlena kind.  The routines we call in arrayfuncs.c
+ * have to know the difference (that's what they need refattrlength for).
  */
 static Datum
 ExecEvalArrayRef(ArrayRef *arrayRef,
@@ -77,7 +83,8 @@ ExecEvalArrayRef(ArrayRef *arrayRef,
                                 bool *isNull,
                                 bool *isDone)
 {
-       ArrayType  *array_scanner;
+       ArrayType  *array_source;
+       ArrayType  *resultArray;
        List       *elt;
        int                     i = 0,
                                j = 0;
@@ -90,7 +97,7 @@ ExecEvalArrayRef(ArrayRef *arrayRef,
 
        if (arrayRef->refexpr != NULL)
        {
-               array_scanner = (ArrayType *)
+               array_source = (ArrayType *)
                        DatumGetPointer(ExecEvalExpr(arrayRef->refexpr,
                                                                                 econtext,
                                                                                 isNull,
@@ -110,7 +117,7 @@ ExecEvalArrayRef(ArrayRef *arrayRef,
                 * the INSERT column list. This is a kluge, but it's not real
                 * clear what the semantics ought to be...
                 */
-               array_scanner = NULL;
+               array_source = NULL;
        }
 
        foreach(elt, arrayRef->refupperindexpr)
@@ -162,43 +169,45 @@ ExecEvalArrayRef(ArrayRef *arrayRef,
                if (*isNull)
                        return (Datum) NULL;
 
-               if (array_scanner == NULL)
+               if (array_source == NULL)
                        return sourceData;      /* XXX do something else? */
 
-               /*
-                * XXX shouldn't we copy the array value before modifying it??
-                *
-                * Or perhaps these array routines should deliver a modified copy
-                * instead of changing the source in-place.
-                */
                if (lIndex == NULL)
-                       return PointerGetDatum(array_set(array_scanner, i,
-                                                                                        upper.indx,
-                                                                                        sourceData,
-                                                                                        arrayRef->refelembyval,
-                                                                                        arrayRef->refelemlength,
-                                                                                        arrayRef->refattrlength,
-                                                                                        isNull));
-               return PointerGetDatum(array_assgn(array_scanner, i,
-                                                                                  upper.indx, lower.indx,
-                                                                                  (ArrayType *) DatumGetPointer(sourceData),
-                                                                                  arrayRef->refelembyval,
-                                                                                  arrayRef->refelemlength,
-                                                                                  isNull));
+                       resultArray = array_set(array_source, i,
+                                                                       upper.indx,
+                                                                       sourceData,
+                                                                       arrayRef->refelembyval,
+                                                                       arrayRef->refelemlength,
+                                                                       arrayRef->refattrlength,
+                                                                       isNull);
+               else
+                       resultArray = array_set_slice(array_source, i,
+                                                                                 upper.indx, lower.indx,
+                                                                                 (ArrayType *) DatumGetPointer(sourceData),
+                                                                                 arrayRef->refelembyval,
+                                                                                 arrayRef->refelemlength,
+                                                                                 arrayRef->refattrlength,
+                                                                                 isNull);
+               return PointerGetDatum(resultArray);
        }
 
        if (lIndex == NULL)
-               return array_ref(array_scanner, i,
+               return array_ref(array_source, i,
                                                 upper.indx,
                                                 arrayRef->refelembyval,
                                                 arrayRef->refelemlength,
                                                 arrayRef->refattrlength,
                                                 isNull);
-       return PointerGetDatum(array_clip(array_scanner, i,
+       else
+       {
+               resultArray = array_get_slice(array_source, i,
                                                                          upper.indx, lower.indx,
                                                                          arrayRef->refelembyval,
                                                                          arrayRef->refelemlength,
-                                                                         isNull));
+                                                                         arrayRef->refattrlength,
+                                                                         isNull);
+               return PointerGetDatum(resultArray);
+       }
 }
 
 
index ba39ab03003e29d16604c310f1da6d1ef5c0e81c..0f42d93aadbde2c50896d5c5abb0fd8729ee21a0 100644 (file)
@@ -1,7 +1,7 @@
 #
 # Makefile for utils/adt
 #
-# $Header: /cvsroot/pgsql/src/backend/utils/adt/Makefile,v 1.39 2000/07/13 16:07:14 petere Exp $
+# $Header: /cvsroot/pgsql/src/backend/utils/adt/Makefile,v 1.40 2000/07/22 03:34:43 tgl Exp $
 #
 
 subdir = src/backend/utils/adt
@@ -15,7 +15,7 @@ CFLAGS+= -mieee
 endif
 endif
 
-OBJS = acl.o arrayfuncs.o arrayutils.o bool.o cash.o char.o chunk.o \
+OBJS = acl.o arrayfuncs.o arrayutils.o bool.o cash.o char.o \
        date.o datetime.o datum.o filename.o float.o format_type.o \
        geo_ops.o geo_selfuncs.o int.o int8.o like.o lztext.o \
        misc.o nabstime.o name.o not_in.o numeric.o numutils.o \
index 907082a7268ba2b85682ca17e0183c1f80b8326d..b4bc71f349adc7883c39025ebd8556ddbf912bd3 100644 (file)
@@ -1,14 +1,14 @@
 /*-------------------------------------------------------------------------
  *
  * arrayfuncs.c
- *       Special functions for arrays.
+ *       Support functions for arrays.
  *
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.61 2000/07/17 03:05:17 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.62 2000/07/22 03:34:43 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "catalog/catalog.h"
 #include "catalog/pg_type.h"
-#include "libpq/be-fsstubs.h"
-#include "libpq/libpq-fs.h"
-#include "storage/fd.h"
 #include "utils/array.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
 #define ASSGN   "="
 
+#define RETURN_NULL(type)  do { *isNull = true; return (type) 0; } while (0)
+
+
 /*
  * An array has the following internal structure:
  *       <nbytes>              - total number of bytes
  *       <dim>                 - size of each array axis
  *       <dim_lower>   - lower boundary of each dimension
  *       <actual data> - whatever is the stored data
+ * The actual data starts on a MAXALIGN boundary.
  */
 
-static int     _ArrayCount(char *str, int *dim, int typdelim);
+static int     ArrayCount(char *str, int *dim, int typdelim);
 static Datum *ReadArrayStr(char *arrayStr, int nitems, int ndim, int *dim,
                          FmgrInfo *inputproc, Oid typelem, int32 typmod,
                          char typdelim, int typlen, bool typbyval,
                          char typalign, int *nbytes);
-#ifdef LOARRAY
-static char *_ReadLOArray(char *str, int *nbytes, int *fd, bool *chunkFlag,
-                        int ndim, int *dim, int baseSize);
-#endif
 static void CopyArrayEls(char *p, Datum *values, int nitems,
                                                 bool typbyval, int typlen, char typalign,
                                                 bool freedata);
 static void system_cache_lookup(Oid element_type, bool input, int *typlen,
                                 bool *typbyval, char *typdelim, Oid *typelem, Oid *proc,
                                        char *typalign);
-static Datum _ArrayCast(char *value, bool byval, int len);
-
-#ifdef LOARRAY
-static char *_AdvanceBy1word(char *str, char **word);
-
-#endif
-static void _ArrayRange(int *st, int *endp, int bsize, char *destPtr,
-                       ArrayType *array, int from);
-static int     _ArrayClipCount(int *stI, int *endpI, ArrayType *array);
-static void _LOArrayRange(int *st, int *endp, int bsize, int srcfd,
-                         int destfd, ArrayType *array, int isSrcLO, bool *isNull);
-static void _ReadArray(int *st, int *endp, int bsize, int srcfd, int destfd,
-                  ArrayType *array, int isDestLO, bool *isNull);
+static Datum ArrayCast(char *value, bool byval, int len);
+static void ArrayClipCopy(int *st, int *endp, int bsize, char *destPtr,
+                                                 ArrayType *array, bool from);
+static int     ArrayClipCount(int *st, int *endp, ArrayType *array);
 static int     ArrayCastAndSet(Datum src, bool typbyval, int typlen, char *dest);
-static int     SanityCheckInput(int ndim, int n, int *dim, int *lb, int *indx);
+static bool SanityCheckInput(int ndim, int n, int *dim, int *lb, int *indx);
 static int     array_read(char *destptr, int eltsize, int nitems, char *srcptr);
 static char *array_seek(char *ptr, int eltsize, int nitems);
 
+
 /*---------------------------------------------------------------------
  * array_in :
  *               converts an array from the external format in "string" to
@@ -153,8 +142,9 @@ array_in(PG_FUNCTION_ARGS)
        {
                if (*p == '{')
                {
-                       ndim = _ArrayCount(p, dim, typdelim);
-                       for (i = 0; i < ndim; lBound[i++] = 1);
+                       ndim = ArrayCount(p, dim, typdelim);
+                       for (i = 0; i < ndim; i++)
+                               lBound[i] = 1;
                }
                else
                        elog(ERROR, "array_in: Need to specify dimension");
@@ -179,71 +169,46 @@ array_in(PG_FUNCTION_ARGS)
        printf(") for %s\n", string);
 #endif
 
-       nitems = getNitems(ndim, dim);
+       nitems = ArrayGetNItems(ndim, dim);
        if (nitems == 0)
        {
                retval = (ArrayType *) palloc(sizeof(ArrayType));
                MemSet(retval, 0, sizeof(ArrayType));
-               *(int32 *) retval = sizeof(ArrayType);
+               retval->size = sizeof(ArrayType);
                PG_RETURN_ARRAYTYPE_P(retval);
        }
 
-       if (*p == '{')
-       {
-               /* array not a large object */
-               dataPtr = ReadArrayStr(p, nitems, ndim, dim, &inputproc, typelem,
-                                                          typmod, typdelim, typlen, typbyval, typalign,
-                                                          &nbytes);
-               nbytes += ARR_OVERHEAD(ndim);
-               retval = (ArrayType *) palloc(nbytes);
-               MemSet(retval, 0, nbytes);
-               retval->size = nbytes;
-               retval->ndim = ndim;
-               SET_LO_FLAG(false, retval);
-               memcpy((char *) ARR_DIMS(retval), (char *) dim,
-                          ndim * sizeof(int));
-               memcpy((char *) ARR_LBOUND(retval), (char *) lBound,
-                          ndim * sizeof(int));
-
-               CopyArrayEls(ARR_DATA_PTR(retval), dataPtr, nitems,
-                                        typbyval, typlen, typalign, true);
-               pfree(dataPtr);
-       }
-       else
-       {
-#ifdef LOARRAY
-               int                     dummy,
-                                       bytes;
-               bool            chunked = false;
-
-               dataPtr = _ReadLOArray(p, &bytes, &dummy, &chunked, ndim,
-                                                          dim, typlen);
-               nbytes = bytes + ARR_OVERHEAD(ndim);
-               retval = (ArrayType *) palloc(nbytes);
-               MemSet(retval, 0, nbytes);
-               retval->size = nbytes;
-               retval->ndim = ndim;
-               SET_LO_FLAG(true, retval);
-               SET_CHUNK_FLAG(chunked, retval);
-               memmove((char *) ARR_DIMS(retval), (char *) dim, ndim * sizeof(int));
-               memmove((char *) ARR_LBOUND(retval), (char *) lBound, ndim * sizeof(int));
-               memmove(ARR_DATA_PTR(retval), dataPtr, bytes);
-#endif
-               elog(ERROR, "large object arrays not supported");
-               PG_RETURN_NULL();
-       }
+       if (*p != '{')
+               elog(ERROR, "array_in: missing left brace");
+
+       dataPtr = ReadArrayStr(p, nitems, ndim, dim, &inputproc, typelem,
+                                                  typmod, typdelim, typlen, typbyval, typalign,
+                                                  &nbytes);
+       nbytes += ARR_OVERHEAD(ndim);
+       retval = (ArrayType *) palloc(nbytes);
+       MemSet(retval, 0, nbytes);
+       retval->size = nbytes;
+       retval->ndim = ndim;
+       memcpy((char *) ARR_DIMS(retval), (char *) dim,
+                  ndim * sizeof(int));
+       memcpy((char *) ARR_LBOUND(retval), (char *) lBound,
+                  ndim * sizeof(int));
+
+       CopyArrayEls(ARR_DATA_PTR(retval), dataPtr, nitems,
+                                typbyval, typlen, typalign, true);
+       pfree(dataPtr);
        pfree(string_save);
        PG_RETURN_ARRAYTYPE_P(retval);
 }
 
 /*-----------------------------------------------------------------------------
- * _ArrayCount
+ * ArrayCount
  *      Counts the number of dimensions and the *dim array for an array string.
  *              The syntax for array input is C-like nested curly braces
  *-----------------------------------------------------------------------------
  */
 static int
-_ArrayCount(char *str, int *dim, int typdelim)
+ArrayCount(char *str, int *dim, int typdelim)
 {
        int                     nest_level = 0,
                                i;
@@ -411,7 +376,7 @@ ReadArrayStr(char *arrayStr,
                                        if (!scanning_string)
                                        {
                                                if (i == -1)
-                                                       i = tuple2linear(ndim, indx, prod);
+                                                       i = ArrayGetOffset0(ndim, indx, prod);
                                                nest_level--;
                                                if (nest_level == 0)
                                                        eoArray = done = true;
@@ -426,7 +391,7 @@ ReadArrayStr(char *arrayStr,
                                        if (*q == typdelim && !scanning_string)
                                        {
                                                if (i == -1)
-                                                       i = tuple2linear(ndim, indx, prod);
+                                                       i = ArrayGetOffset0(ndim, indx, prod);
                                                done = true;
                                                indx[ndim - 1]++;
                                        }
@@ -491,79 +456,6 @@ ReadArrayStr(char *arrayStr,
 }
 
 
-/*----------------------------------------------------------------------------
- * Read data about an array to be stored as a large object
- *----------------------------------------------------------------------------
- */
-#ifdef LOARRAY
-static char *
-_ReadLOArray(char *str,
-                        int *nbytes,
-                        int *fd,
-                        bool *chunkFlag,
-                        int ndim,
-                        int *dim,
-                        int baseSize)
-{
-       char       *inputfile,
-                          *accessfile = NULL,
-                          *chunkfile = NULL;
-       char       *retStr,
-                          *_AdvanceBy1word();
-       Oid                     lobjId;
-
-       str = _AdvanceBy1word(str, &inputfile);
-
-       while (str != NULL)
-       {
-               char       *word;
-
-               str = _AdvanceBy1word(str, &word);
-
-               if (!strcmp(word, "-chunk"))
-               {
-                       if (str == NULL)
-                               elog(ERROR, "array_in: access pattern file required");
-                       str = _AdvanceBy1word(str, &accessfile);
-               }
-               else if (!strcmp(word, "-noreorg"))
-               {
-                       if (str == NULL)
-                               elog(ERROR, "array_in: chunk file required");
-                       str = _AdvanceBy1word(str, &chunkfile);
-               }
-               else
-                       elog(ERROR, "usage: <input file> -chunk DEFAULT/<access pattern file> -invert/-native [-noreorg <chunk file>]");
-       }
-
-       if (inputfile == NULL)
-               elog(ERROR, "array_in: missing file name");
-       lobjId = DatumGetObjectId(DirectFunctionCall1(lo_creat,
-                                                                                                 Int32GetDatum(0)));
-       *fd = DatumGetInt32(DirectFunctionCall2(lo_open,
-                                                                                       ObjectIdGetDatum(lobjId),
-                                                                                       Int32GetDatum(INV_READ)));
-       if (*fd < 0)
-               elog(ERROR, "Large object create failed");
-       retStr = inputfile;
-       *nbytes = strlen(retStr) + 2;
-
-       if (accessfile)
-       {
-               FILE       *afd;
-
-               if ((afd = AllocateFile(accessfile, PG_BINARY_R)) == NULL)
-                       elog(ERROR, "unable to open access pattern file");
-               *chunkFlag = true;
-               retStr = _ChunkArray(*fd, afd, ndim, dim, baseSize, nbytes,
-                                                        chunkfile);
-               FreeFile(afd);
-       }
-       return retStr;
-}
-
-#endif
-
 /*----------
  * Copy data into an array object from a temporary array of Datums.
  *
@@ -634,34 +526,13 @@ array_out(PG_FUNCTION_ARGS)
        int                     ndim,
                           *dim;
 
-       if (ARR_IS_LO(v) == true)
-       {
-               text       *p;
-               int                     plen,
-                                       nbytes;
-
-               p = (text *) DatumGetPointer(DirectFunctionCall1(array_dims,
-                                                                                                                PointerGetDatum(v)));
-               plen = VARSIZE(p) - VARHDRSZ;
-
-               /* get a wide string to print to */
-               nbytes = strlen(ARR_DATA_PTR(v)) + strlen(ASSGN) + plen + 1;
-               retval = (char *) palloc(nbytes);
-
-               memcpy(retval, VARDATA(p), plen);
-               strcpy(retval + plen, ASSGN);
-               strcat(retval, ARR_DATA_PTR(v));
-               pfree(p);
-               PG_RETURN_CSTRING(retval);
-       }
-
        system_cache_lookup(element_type, false, &typlen, &typbyval,
                                                &typdelim, &typelem, &typoutput, &typalign);
        fmgr_info(typoutput, &outputproc);
        sprintf(delim, "%c", typdelim);
        ndim = ARR_NDIM(v);
        dim = ARR_DIMS(v);
-       nitems = getNitems(ndim, dim);
+       nitems = ArrayGetNItems(ndim, dim);
 
        if (nitems == 0)
        {
@@ -814,7 +685,6 @@ array_dims(PG_FUNCTION_ARGS)
         */
 
        result = (text *) palloc(nbytes + VARHDRSZ);
-       MemSet(result, 0, nbytes + VARHDRSZ);
        p = VARDATA(result);
 
        dimv = ARR_DIMS(v);
@@ -846,143 +716,87 @@ array_ref(ArrayType *array,
                  int arraylen,
                  bool *isNull)
 {
-       int                     i,
-                               ndim,
+       int                     ndim,
                           *dim,
                           *lb,
-                               offset,
-                               nbytes;
-       struct varlena *v = NULL;
-       Datum           result;
-       char       *retval;
+                               offset;
+       char       *retptr;
 
        if (array == (ArrayType *) NULL)
                RETURN_NULL(Datum);
+
        if (arraylen > 0)
        {
-
                /*
-                * fixed length arrays -- these are assumed to be 1-d
+                * fixed-length arrays -- these are assumed to be 1-d, 0-based
                 */
-               if (indx[0] * elmlen > arraylen)
-                       elog(ERROR, "array_ref: array bound exceeded");
-               retval = (char *) array + indx[0] * elmlen;
-               return _ArrayCast(retval, elmbyval, elmlen);
+               if (nSubscripts != 1)
+                       RETURN_NULL(Datum);
+               if (indx[0] < 0 || indx[0] * elmlen >= arraylen)
+                       RETURN_NULL(Datum);
+               retptr = (char *) array + indx[0] * elmlen;
+               return ArrayCast(retptr, elmbyval, elmlen);
        }
+
+       /* detoast input if necessary */
+       array = DatumGetArrayTypeP(PointerGetDatum(array));
+
+       ndim = ARR_NDIM(array);
        dim = ARR_DIMS(array);
        lb = ARR_LBOUND(array);
-       ndim = ARR_NDIM(array);
-       nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
 
        if (!SanityCheckInput(ndim, nSubscripts, dim, lb, indx))
                RETURN_NULL(Datum);
 
-       offset = GetOffset(nSubscripts, dim, lb, indx);
-
-       if (ARR_IS_LO(array))
-       {
-               char       *lo_name;
-               int                     fd = 0;
-
-               /* We are assuming fixed element lengths here */
-               offset *= elmlen;
-               lo_name = (char *) ARR_DATA_PTR(array);
-#ifdef LOARRAY
-               if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_READ : O_RDONLY)) < 0)
-                       RETURN_NULL(Datum);
-#endif
-               if (ARR_IS_CHUNKED(array))
-                       v = _ReadChunkArray1El(indx, elmlen, fd, array, isNull);
-               else
-               {
-                       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(fd),
-                                                         Int32GetDatum(offset),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-                               RETURN_NULL(Datum);
-#ifdef LOARRAY
-                       v = (struct varlena *)
-                               DatumGetPointer(DirectFunctionCall2(loread,
-                                                                                                       Int32GetDatum(fd),
-                                                                                                       Int32GetDatum(elmlen)));
-#endif
-               }
-               if (*isNull)
-                       RETURN_NULL(Datum);
-               if (VARSIZE(v) - VARHDRSZ < elmlen)
-                       RETURN_NULL(Datum);
-               DirectFunctionCall1(lo_close, Int32GetDatum(fd));
-               result = _ArrayCast((char *) VARDATA(v), elmbyval, elmlen);
-               if (! elmbyval)
-               {                                               /* not by value */
-                       char       *tempdata = palloc(elmlen);
-
-                       memcpy(tempdata, DatumGetPointer(result), elmlen);
-                       result = PointerGetDatum(tempdata);
-               }
-               pfree(v);
-               return result;
-       }
+       offset = ArrayGetOffset(nSubscripts, dim, lb, indx);
 
-       if (elmlen > 0)
-       {
-               offset = offset * elmlen;
-               /* off the end of the array */
-               if (nbytes - offset < 1)
-                       RETURN_NULL(Datum);
-               retval = ARR_DATA_PTR(array) + offset;
-               return _ArrayCast(retval, elmbyval, elmlen);
-       }
-       else
-       {
-               int                     bytes = nbytes;
+       retptr = array_seek(ARR_DATA_PTR(array), elmlen, offset);
 
-               retval = ARR_DATA_PTR(array);
-               i = 0;
-               while (bytes > 0)
-               {
-                       if (i == offset)
-                               return PointerGetDatum(retval);
-                       bytes -= INTALIGN(*(int32 *) retval);
-                       retval += INTALIGN(*(int32 *) retval);
-                       i++;
-               }
-               RETURN_NULL(Datum);
-       }
+       return ArrayCast(retptr, elmbyval, elmlen);
 }
 
 /*-----------------------------------------------------------------------------
- * array_clip :
- *               This routine takes an array and a range of indices (upperIndex and
+ * array_get_slice :
+ *                This routine takes an array and a range of indices (upperIndex and
  *                lowerIndx), creates a new array structure for the referred elements
  *                and returns a pointer to it.
  *-----------------------------------------------------------------------------
  */
 ArrayType *
-array_clip(ArrayType *array,
-                  int nSubscripts,
-                  int *upperIndx,
-                  int *lowerIndx,
-                  bool elmbyval,
-                  int elmlen,
-                  bool *isNull)
+array_get_slice(ArrayType *array,
+                               int nSubscripts,
+                               int *upperIndx,
+                               int *lowerIndx,
+                               bool elmbyval,
+                               int elmlen,
+                               int arraylen,
+                               bool *isNull)
 {
        int                     i,
                                ndim,
                           *dim,
-                          *lb,
-                               nbytes;
+                          *lb;
        ArrayType  *newArr;
        int                     bytes,
                                span[MAXDIM];
 
-       /* timer_start(); */
        if (array == (ArrayType *) NULL)
                RETURN_NULL(ArrayType *);
+
+       if (arraylen > 0)
+       {
+               /*
+                * fixed-length arrays -- no can do slice...
+                */
+               elog(ERROR, "Slices of fixed-length arrays not implemented");
+       }
+
+       /* detoast input if necessary */
+       array = DatumGetArrayTypeP(PointerGetDatum(array));
+
+       ndim = ARR_NDIM(array);
        dim = ARR_DIMS(array);
        lb = ARR_LBOUND(array);
-       ndim = ARR_NDIM(array);
-       nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
 
        if (!SanityCheckInput(ndim, nSubscripts, dim, lb, upperIndx) ||
                !SanityCheckInput(ndim, nSubscripts, dim, lb, lowerIndx))
@@ -990,116 +804,40 @@ array_clip(ArrayType *array,
 
        for (i = 0; i < nSubscripts; i++)
                if (lowerIndx[i] > upperIndx[i])
-                       elog(ERROR, "lowerIndex cannot be larger than upperIndx");
-       mda_get_range(nSubscripts, span, lowerIndx, upperIndx);
-
-       if (ARR_IS_LO(array))
-       {
-#ifdef LOARRAY
-               char       *lo_name;
-
-#endif
-               char       *newname = NULL;
-               int                     fd = 0,
-                                       newfd = 0,
-                                       isDestLO = true,
-                                       rsize;
-
-               if (elmlen < 0)
-                       elog(ERROR, "array_clip: array of variable length objects not implemented");
-#ifdef LOARRAY
-               lo_name = (char *) ARR_DATA_PTR(array);
-               if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_READ : O_RDONLY)) < 0)
                        RETURN_NULL(ArrayType *);
-               newname = _array_newLO(&newfd, Unix);
-#endif
-               bytes = strlen(newname) + 1 + ARR_OVERHEAD(nSubscripts);
-               newArr = (ArrayType *) palloc(bytes);
-               newArr->size = bytes;
-               newArr->ndim = array->ndim;
-               newArr->flags = array->flags;
-               memcpy(ARR_DIMS(newArr), span, nSubscripts * sizeof(int));
-               memcpy(ARR_LBOUND(newArr), lowerIndx, nSubscripts * sizeof(int));
-               strcpy(ARR_DATA_PTR(newArr), newname);
-
-               rsize = compute_size(lowerIndx, upperIndx, nSubscripts, elmlen);
-               if (rsize < BLCKSZ)
-               {
-                       char       *buff;
 
-                       rsize += VARHDRSZ;
-                       buff = palloc(rsize);
-                       if (buff)
-                               isDestLO = false;
-                       if (ARR_IS_CHUNKED(array))
-                       {
-                               _ReadChunkArray(lowerIndx, upperIndx, elmlen, fd, &(buff[VARHDRSZ]),
-                                                               array, 0, isNull);
-                       }
-                       else
-                       {
-                               _ReadArray(lowerIndx, upperIndx, elmlen, fd, (int) &(buff[VARHDRSZ]),
-                                                  array,
-                                                  0, isNull);
-                       }
-                       memmove(buff, &rsize, VARHDRSZ);
-#ifdef LOARRAY
-                       if (!*isNull)
-                               bytes = DatumGetInt32(DirectFunctionCall2(lowrite,
-                                                                         Int32GetDatum(newfd),
-                                                                         PointerGetDatum(buff)));
-#endif
-                       pfree(buff);
-               }
-               if (isDestLO)
-               {
-                       if (ARR_IS_CHUNKED(array))
-                       {
-                               _ReadChunkArray(lowerIndx, upperIndx, elmlen, fd, (char *) newfd, array,
-                                                               1, isNull);
-                       }
-                       else
-                               _ReadArray(lowerIndx, upperIndx, elmlen, fd, newfd, array, 1, isNull);
-               }
-#ifdef LOARRAY
-               LOclose(fd);
-               LOclose(newfd);
-#endif
-               if (*isNull)
-               {
-                       pfree(newArr);
-                       newArr = NULL;
-               }
-               /* timer_end(); */
-               return newArr;
-       }
+       mda_get_range(nSubscripts, span, lowerIndx, upperIndx);
 
        if (elmlen > 0)
-       {
-               bytes = getNitems(nSubscripts, span);
-               bytes = bytes * elmlen + ARR_OVERHEAD(nSubscripts);
-       }
+               bytes = ArrayGetNItems(nSubscripts, span) * elmlen;
        else
-       {
-               bytes = _ArrayClipCount(lowerIndx, upperIndx, array);
-               bytes += ARR_OVERHEAD(nSubscripts);
-       }
+               bytes = ArrayClipCount(lowerIndx, upperIndx, array);
+       bytes += ARR_OVERHEAD(nSubscripts);
+
        newArr = (ArrayType *) palloc(bytes);
        newArr->size = bytes;
        newArr->ndim = array->ndim;
        newArr->flags = array->flags;
        memcpy(ARR_DIMS(newArr), span, nSubscripts * sizeof(int));
        memcpy(ARR_LBOUND(newArr), lowerIndx, nSubscripts * sizeof(int));
-       _ArrayRange(lowerIndx, upperIndx, elmlen, ARR_DATA_PTR(newArr), array, 1);
+       ArrayClipCopy(lowerIndx, upperIndx, elmlen, ARR_DATA_PTR(newArr),
+                                 array, true);
+
        return newArr;
 }
 
 /*-----------------------------------------------------------------------------
- * array_set  :
+ * array_set :
  *               This routine sets the value of an array location (specified by
  *               an index array) to a new value specified by "dataValue".
  * result :
- *               returns a pointer to the modified array.
+ *               A new array is returned, just like the old except for the one
+ *               modified entry.
+ *
+ * NOTE: For assignments, we throw an error for silly subscripts etc,
+ * rather than returning a NULL as the fetch operations do.  The reasoning
+ * is that returning a NULL would cause the user's whole array to be replaced
+ * with NULL, which will probably not make him happy.
  *-----------------------------------------------------------------------------
  */
 ArrayType *
@@ -1115,194 +853,154 @@ array_set(ArrayType *array,
        int                     ndim,
                           *dim,
                           *lb,
-                               offset,
-                               nbytes;
-       char       *pos;
+                               offset;
+       ArrayType  *newarray;
+       char       *elt_ptr;
+       int                     oldsize,
+                               newsize,
+                               oldlen,
+                               newlen,
+                               lth0,
+                               lth1,
+                               lth2;
 
        if (array == (ArrayType *) NULL)
                RETURN_NULL(ArrayType *);
+
        if (arraylen > 0)
        {
-
                /*
-                * fixed length arrays -- these are assumed to be 1-d
+                * fixed-length arrays -- these are assumed to be 1-d, 0-based
                 */
-               if (indx[0] * elmlen > arraylen)
-                       elog(ERROR, "array_ref: array bound exceeded");
-               pos = (char *) array + indx[0] * elmlen;
-               ArrayCastAndSet(dataValue, elmbyval, elmlen, pos);
-               return array;
+               if (nSubscripts != 1)
+                       elog(ERROR, "Invalid array subscripts");
+               if (indx[0] < 0 || indx[0] * elmlen >= arraylen)
+                       elog(ERROR, "Invalid array subscripts");
+               newarray = (ArrayType *) palloc(arraylen);
+               memcpy(newarray, array, arraylen);
+               elt_ptr = (char *) newarray + indx[0] * elmlen;
+               ArrayCastAndSet(dataValue, elmbyval, elmlen, elt_ptr);
+               return newarray;
        }
+
+       /* detoast input if necessary */
+       array = DatumGetArrayTypeP(PointerGetDatum(array));
+
+       ndim = ARR_NDIM(array);
        dim = ARR_DIMS(array);
        lb = ARR_LBOUND(array);
-       ndim = ARR_NDIM(array);
-       nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
 
        if (!SanityCheckInput(ndim, nSubscripts, dim, lb, indx))
-       {
-               elog(ERROR, "array_set: array bound exceeded");
-               return array;
-       }
-       offset = GetOffset(nSubscripts, dim, lb, indx);
+               elog(ERROR, "Invalid array subscripts");
 
-       if (ARR_IS_LO(array))
-       {
-               int                     fd = 0;
-               struct varlena *v;
+       offset = ArrayGetOffset(nSubscripts, dim, lb, indx);
 
-               /* We are assuming fixed element lengths here */
-               offset *= elmlen;
-#ifdef LOARRAY
-               char       *lo_name;
+       elt_ptr = array_seek(ARR_DATA_PTR(array), elmlen, offset);
 
-               lo_name = ARR_DATA_PTR(array);
-               if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_WRITE : O_WRONLY)) < 0)
-                       return array;
-#endif
-               if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(fd),
-                                                         Int32GetDatum(offset),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-                       return array;
-               v = (struct varlena *) palloc(elmlen + VARHDRSZ);
-               VARATT_SIZEP(v) = elmlen + VARHDRSZ;
-               ArrayCastAndSet(dataValue, elmbyval, elmlen, VARDATA(v));
-#ifdef LOARRAY
-               if (DatumGetInt32(DirectFunctionCall2(lowrite,
-                                                                                         Int32GetDatum(fd),
-                                                                                         PointerGetDatum(v)))
-                       != elmlen)
-                       RETURN_NULL(ArrayType *);
-#endif
-               pfree(v);
-               DirectFunctionCall1(lo_close, Int32GetDatum(fd));
-               return array;
-       }
        if (elmlen > 0)
        {
-               offset = offset * elmlen;
-               /* off the end of the array */
-               if (nbytes - offset < 1)
-                       return array;
-               pos = ARR_DATA_PTR(array) + offset;
+               oldlen = newlen = elmlen;
        }
        else
        {
-               ArrayType  *newarray;
-               char       *elt_ptr;
-               int                     oldsize,
-                                       newsize,
-                                       oldlen,
-                                       newlen,
-                                       lth0,
-                                       lth1,
-                                       lth2;
-
-               elt_ptr = array_seek(ARR_DATA_PTR(array), -1, offset);
+               /* varlena type */
                oldlen = INTALIGN(*(int32 *) elt_ptr);
                newlen = INTALIGN(*(int32 *) DatumGetPointer(dataValue));
-
-               if (oldlen == newlen)
-               {
-                       /* new element with same size, overwrite old data */
-                       ArrayCastAndSet(dataValue, elmbyval, elmlen, elt_ptr);
-                       return array;
-               }
-
-               /* new element with different size, reallocate the array */
-               oldsize = array->size;
-               lth0 = ARR_OVERHEAD(nSubscripts);
-               lth1 = (int) (elt_ptr - ARR_DATA_PTR(array));
-               lth2 = (int) (oldsize - lth0 - lth1 - oldlen);
-               newsize = lth0 + lth1 + newlen + lth2;
-
-               newarray = (ArrayType *) palloc(newsize);
-               memmove((char *) newarray, (char *) array, lth0 + lth1);
-               newarray->size = newsize;
-               newlen = ArrayCastAndSet(dataValue, elmbyval, elmlen,
-                                                                (char *) newarray + lth0 + lth1);
-               memmove((char *) newarray + lth0 + lth1 + newlen,
-                               (char *) array + lth0 + lth1 + oldlen, lth2);
-
-               /* ??? who should free this storage ??? */
-               return newarray;
        }
-       ArrayCastAndSet(dataValue, elmbyval, elmlen, pos);
-       return array;
+
+       oldsize = ARR_SIZE(array);
+       lth0 = ARR_OVERHEAD(ndim);
+       lth1 = (int) (elt_ptr - ARR_DATA_PTR(array));
+       lth2 = (int) (oldsize - lth0 - lth1 - oldlen);
+       newsize = lth0 + lth1 + newlen + lth2;
+
+       newarray = (ArrayType *) palloc(newsize);
+       memcpy((char *) newarray, (char *) array, lth0 + lth1);
+       memcpy((char *) newarray + lth0 + lth1 + newlen,
+                  (char *) array + lth0 + lth1 + oldlen, lth2);
+       newarray->size = newsize;
+       newlen = ArrayCastAndSet(dataValue, elmbyval, elmlen,
+                                                        (char *) newarray + lth0 + lth1);
+
+       return newarray;
 }
 
 /*----------------------------------------------------------------------------
- * array_assgn :
+ * array_set_slice :
  *               This routine sets the value of a range of array locations (specified
  *               by upper and lower index values ) to new values passed as
  *               another array
  * result :
- *               returns a pointer to the modified array.
+ *               A new array is returned, just like the old except for the
+ *               modified range.
+ *
+ * NOTE: For assignments, we throw an error for silly subscripts etc,
+ * rather than returning a NULL as the fetch operations do.  The reasoning
+ * is that returning a NULL would cause the user's whole array to be replaced
+ * with NULL, which will probably not make him happy.
  *----------------------------------------------------------------------------
  */
 ArrayType *
-array_assgn(ArrayType *array,
-                       int nSubscripts,
-                       int *upperIndx,
-                       int *lowerIndx,
-                       ArrayType *newArr,
-                       bool elmbyval,
-                       int elmlen,
-                       bool *isNull)
+array_set_slice(ArrayType *array,
+                               int nSubscripts,
+                               int *upperIndx,
+                               int *lowerIndx,
+                               ArrayType *srcArray,
+                               bool elmbyval,
+                               int elmlen,
+                               int arraylen,
+                               bool *isNull)
 {
        int                     i,
                                ndim,
                           *dim,
                           *lb;
+       int                     span[MAXDIM];
 
        if (array == (ArrayType *) NULL)
                RETURN_NULL(ArrayType *);
+       if (srcArray == (ArrayType *) NULL)
+               RETURN_NULL(ArrayType *);
+
+       if (arraylen > 0)
+       {
+               /*
+                * fixed-length arrays -- no can do slice...
+                */
+               elog(ERROR, "Updates on slices of fixed-length arrays not implemented");
+       }
+
+       /* detoast array, making sure we get an overwritable copy */
+       array = DatumGetArrayTypePCopy(PointerGetDatum(array));
+
+       /* detoast source array if necessary */
+       srcArray = DatumGetArrayTypeP(PointerGetDatum(srcArray));
+
        if (elmlen < 0)
-               elog(ERROR, "array_assgn: updates on arrays of variable length elements not implemented");
+               elog(ERROR, "Updates on slices of arrays of variable length elements not implemented");
 
+       ndim = ARR_NDIM(array);
        dim = ARR_DIMS(array);
        lb = ARR_LBOUND(array);
-       ndim = ARR_NDIM(array);
 
        if (!SanityCheckInput(ndim, nSubscripts, dim, lb, upperIndx) ||
                !SanityCheckInput(ndim, nSubscripts, dim, lb, lowerIndx))
-               RETURN_NULL(ArrayType *);
+               elog(ERROR, "Invalid array subscripts");
 
        for (i = 0; i < nSubscripts; i++)
                if (lowerIndx[i] > upperIndx[i])
-                       elog(ERROR, "lowerIndex larger than upperIndx");
+               elog(ERROR, "Invalid array subscripts");
 
-       if (ARR_IS_LO(array))
-       {
-               int                     fd = 0,
-                                       newfd = 0;
+       /* make sure source array has enough entries */
+       mda_get_range(ndim, span, lowerIndx, upperIndx);
 
-#ifdef LOARRAY
-               char       *lo_name;
+       if (ArrayGetNItems(ndim, span) >
+               ArrayGetNItems(ARR_NDIM(srcArray), ARR_DIMS(srcArray)))
+               elog(ERROR, "Source array too small");
+
+       ArrayClipCopy(lowerIndx, upperIndx, elmlen, ARR_DATA_PTR(srcArray),
+                                 array, false);
 
-               lo_name = (char *) ARR_DATA_PTR(array);
-               if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_WRITE : O_WRONLY)) < 0)
-                       return array;
-#endif
-               if (ARR_IS_LO(newArr))
-               {
-#ifdef LOARRAY
-                       lo_name = (char *) ARR_DATA_PTR(newArr);
-                       if ((newfd = LOopen(lo_name, ARR_IS_INV(newArr) ? INV_READ : O_RDONLY)) < 0)
-                               return array;
-#endif
-                       _LOArrayRange(lowerIndx, upperIndx, elmlen, fd, newfd, array, 1, isNull);
-                       DirectFunctionCall1(lo_close, Int32GetDatum(newfd));
-               }
-               else
-               {
-                       _LOArrayRange(lowerIndx, upperIndx, elmlen, fd, (int) ARR_DATA_PTR(newArr),
-                                                 array, 0, isNull);
-               }
-               DirectFunctionCall1(lo_close, Int32GetDatum(fd));
-               return array;
-       }
-       _ArrayRange(lowerIndx, upperIndx, elmlen, ARR_DATA_PTR(newArr), array, 0);
        return array;
 }
 
@@ -1337,7 +1035,7 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
        ArrayType  *v;
        ArrayType  *result;
        Datum      *values;
-       char       *elt;
+       Datum           elt;
        int                *dim;
        int                     ndim;
        int                     nitems;
@@ -1360,13 +1058,9 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
                elog(ERROR, "array_map: null input array");
        v = PG_GETARG_ARRAYTYPE_P(0);
 
-       /* Large objects not yet supported */
-       if (ARR_IS_LO(v) == true)
-               elog(ERROR, "array_map: large objects not supported");
-
        ndim = ARR_NDIM(v);
        dim = ARR_DIMS(v);
-       nitems = getNitems(ndim, dim);
+       nitems = ArrayGetNItems(ndim, dim);
 
        /* Check for empty array */
        if (nitems <= 0)
@@ -1380,7 +1074,6 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
 
        /* Allocate temporary array for new values */
        values = (Datum *) palloc(nitems * sizeof(Datum));
-       MemSet(values, 0, nitems * sizeof(Datum));
 
        /* Loop over source data */
        s = (char *) ARR_DATA_PTR(v);
@@ -1392,22 +1085,25 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
                        switch (inp_typlen)
                        {
                                case 1:
-                                       elt = (char *) ((int) (*(char *) s));
+                                       elt = CharGetDatum(*s);
                                        break;
                                case 2:
-                                       elt = (char *) ((int) (*(int16 *) s));
+                                       elt = Int16GetDatum(*(int16 *) s);
                                        break;
-                               case 3:
                                case 4:
+                                       elt = Int32GetDatum(*(int32 *) s);
+                                       break;
                                default:
-                                       elt = (char *) (*(int32 *) s);
+                                       elog(ERROR, "array_map: unsupported byval length %d",
+                                                inp_typlen);
+                                       elt = 0;        /* keep compiler quiet */
                                        break;
                        }
                        s += inp_typlen;
                }
                else
                {
-                       elt = s;
+                       elt = PointerGetDatum(s);
                        if (inp_typlen > 0)
                                s += inp_typlen;
                        else
@@ -1421,7 +1117,7 @@ array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType)
                 * whether fn() is strict.  Would need to do more work here
                 * to support arrays containing nulls, too.
                 */
-               fcinfo->arg[0] = (Datum) elt;
+               fcinfo->arg[0] = elt;
                fcinfo->argnull[0] = false;
                fcinfo->isnull = false;
                values[i] = FunctionCallInvoke(fcinfo);
@@ -1524,7 +1220,7 @@ deconstruct_array(ArrayType *array,
        char       *p;
        int                     i;
 
-       nelems = getNitems(ARR_NDIM(array), ARR_DIMS(array));
+       nelems = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
        if (nelems <= 0)
        {
                *elemsp = NULL;
@@ -1570,6 +1266,8 @@ deconstruct_array(ArrayType *array,
  *               compares two arrays for equality
  * result :
  *               returns true if the arrays are equal, false otherwise.
+ *
+ * XXX bitwise equality is pretty bogus ...
  *-----------------------------------------------------------------------------
  */
 Datum
@@ -1577,17 +1275,25 @@ array_eq(PG_FUNCTION_ARGS)
 {
        ArrayType  *array1 = PG_GETARG_ARRAYTYPE_P(0);
        ArrayType  *array2 = PG_GETARG_ARRAYTYPE_P(1);
+       bool            result = true;
+
+       if (ARR_SIZE(array1) != ARR_SIZE(array2))
+               result = false;
+       else if (memcmp(array1, array2, ARR_SIZE(array1)) != 0)
+               result = false;
 
-       if (*(int32 *) array1 != *(int32 *) array2)
-               PG_RETURN_BOOL(false);
-       if (memcmp(array1, array2, *(int32 *) array1) != 0)
-               PG_RETURN_BOOL(false);
-       PG_RETURN_BOOL(true);
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(array1, 0);
+       PG_FREE_IF_COPY(array2, 1);
+
+       PG_RETURN_BOOL(result);
 }
 
+
 /***************************************************************************/
 /******************|             Support  Routines                       |*****************/
 /***************************************************************************/
+
 static void
 system_cache_lookup(Oid element_type,
                                        bool input,
@@ -1620,28 +1326,26 @@ system_cache_lookup(Oid element_type,
                *proc = typeStruct->typoutput;
 }
 
+/* Fetch array value at pointer, converted correctly to a Datum */
 static Datum
-_ArrayCast(char *value, bool byval, int len)
+ArrayCast(char *value, bool byval, int len)
 {
-       if (byval)
+       if (! byval)
+               return PointerGetDatum(value);
+
+       switch (len)
        {
-               switch (len)
-               {
-                               case 1:
-                               return (Datum) *value;
-                       case 2:
-                               return (Datum) *(int16 *) value;
-                       case 3:
-                       case 4:
-                               return (Datum) *(int32 *) value;
-                       default:
-                               elog(ERROR, "array_ref: byval and elt len > 4!");
-                               break;
-               }
+               case 1:
+                       return CharGetDatum(*value);
+               case 2:
+                       return Int16GetDatum(*(int16 *) value);
+               case 4:
+                       return Int32GetDatum(*(int32 *) value);
+               default:
+                       elog(ERROR, "ArrayCast: unsupported byval length %d", len);
+                       break;
        }
-       else
-               return (Datum) value;
-       return 0;
+       return 0;                                       /* keep compiler quiet */
 }
 
 /*
@@ -1673,7 +1377,8 @@ ArrayCastAndSet(Datum src,
                                        *(int32 *) dest = DatumGetInt32(src);
                                        break;
                                default:
-                                       elog(ERROR, "ArrayCastAndSet: unexpected typlen");
+                                       elog(ERROR, "ArrayCastAndSet: unsupported byval length %d",
+                                                typlen);
                                        break;
                        }
                        /* For by-val types, assume no alignment padding is needed */
@@ -1693,55 +1398,32 @@ ArrayCastAndSet(Datum src,
                /* XXX WRONG: should use MAXALIGN or type's alignment requirement */
                inc = INTALIGN(VARSIZE(DatumGetPointer(src)));
        }
-       return inc;
-}
 
-#ifdef LOARRAY
-static char *
-_AdvanceBy1word(char *str, char **word)
-{
-       char       *retstr,
-                          *space;
-
-       *word = NULL;
-       if (str == NULL)
-               return str;
-       while (isspace(*str))
-               str++;
-       *word = str;
-       if ((space = (char *) strchr(str, ' ')) != (char *) NULL)
-       {
-               retstr = space + 1;
-               *space = '\0';
-       }
-       else
-               retstr = NULL;
-       return retstr;
+       return inc;
 }
 
-#endif
-
-static int
+/* Do Sanity check on input subscripting info */
+static bool
 SanityCheckInput(int ndim, int n, int *dim, int *lb, int *indx)
 {
        int                     i;
 
-       /* Do Sanity check on input */
-       if (n != ndim)
-               return 0;
+       if (n != ndim || ndim <= 0 || ndim > MAXDIM)
+               return false;
        for (i = 0; i < ndim; i++)
                if ((lb[i] > indx[i]) || (indx[i] >= (dim[i] + lb[i])))
-                       return 0;
-       return 1;
+                       return false;
+       return true;
 }
 
+/* Copy an array slice into or out of an array */
 static void
-_ArrayRange(int *st,
-                       int *endp,
-                       int bsize,
-                       char *destPtr,
-                       ArrayType *array,
-                       int from)
+ArrayClipCopy(int *st,
+                         int *endp,
+                         int bsize,
+                         char *destPtr,
+                         ArrayType *array,
+                         bool from)
 {
        int                     n,
                           *dim,
@@ -1759,16 +1441,14 @@ _ArrayRange(int *st,
        n = ARR_NDIM(array);
        dim = ARR_DIMS(array);
        lb = ARR_LBOUND(array);
-       srcPtr = ARR_DATA_PTR(array);
-       for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
+       st_pos = ArrayGetOffset(n, dim, lb, st);
+       srcPtr = array_seek(ARR_DATA_PTR(array), bsize, st_pos);
        mda_get_prod(n, dim, prod);
-       st_pos = tuple2linear(n, st, prod);
-       srcPtr = array_seek(srcPtr, bsize, st_pos);
        mda_get_range(n, span, st, endp);
        mda_get_offset_values(n, dist, prod, span);
-       for (i = 0; i < n; indx[i++] = 0);
-       i = j = n - 1;
-       inc = bsize;
+       for (i = 0; i < n; i++)
+               indx[i] = 0;
+       j = n - 1;
        do
        {
                srcPtr = array_seek(srcPtr, bsize, dist[j]);
@@ -1778,11 +1458,12 @@ _ArrayRange(int *st,
                        inc = array_read(srcPtr, bsize, 1, destPtr);
                destPtr += inc;
                srcPtr += inc;
-       } while ((j = next_tuple(i + 1, indx, span)) != -1);
+       } while ((j = mda_next_tuple(n, indx, span)) != -1);
 }
 
+/* Compute space needed for an array slice of varlena items */
 static int
-_ArrayClipCount(int *stI, int *endpI, ArrayType *array)
+ArrayClipCount(int *st, int *endp, ArrayType *array)
 {
        int                     n,
                           *dim,
@@ -1794,34 +1475,32 @@ _ArrayClipCount(int *stI, int *endpI, ArrayType *array)
                                indx[MAXDIM];
        int                     i,
                                j,
-                               inc,
-                               st[MAXDIM],
-                               endp[MAXDIM];
+                               inc;
        int                     count = 0;
        char       *ptr;
 
        n = ARR_NDIM(array);
        dim = ARR_DIMS(array);
        lb = ARR_LBOUND(array);
-       ptr = ARR_DATA_PTR(array);
-       for (i = 0; i < n; st[i] = stI[i] - lb[i], endp[i] = endpI[i] - lb[i], i++);
+       st_pos = ArrayGetOffset(n, dim, lb, st);
+       ptr = array_seek(ARR_DATA_PTR(array), -1, st_pos);
        mda_get_prod(n, dim, prod);
-       st_pos = tuple2linear(n, st, prod);
-       ptr = array_seek(ptr, -1, st_pos);
        mda_get_range(n, span, st, endp);
        mda_get_offset_values(n, dist, prod, span);
-       for (i = 0; i < n; indx[i++] = 0);
-       i = j = n - 1;
+       for (i = 0; i < n; i++)
+               indx[i] = 0;
+       j = n - 1;
        do
        {
                ptr = array_seek(ptr, -1, dist[j]);
                inc = INTALIGN(*(int32 *) ptr);
                ptr += inc;
                count += inc;
-       } while ((j = next_tuple(i + 1, indx, span)) != -1);
+       } while ((j = mda_next_tuple(n, indx, span)) != -1);
        return count;
 }
 
+/* Advance over nitems array elements */
 static char *
 array_seek(char *ptr, int eltsize, int nitems)
 {
@@ -1834,6 +1513,7 @@ array_seek(char *ptr, int eltsize, int nitems)
        return ptr;
 }
 
+/* Copy nitems array elements from srcptr to destptr */
 static int
 array_read(char *destptr, int eltsize, int nitems, char *srcptr)
 {
@@ -1846,7 +1526,8 @@ array_read(char *destptr, int eltsize, int nitems, char *srcptr)
                memmove(destptr, srcptr, eltsize * nitems);
                return eltsize * nitems;
        }
-       for (i = inc = 0; i < nitems; i++)
+       inc = 0;
+       for (i = 0; i < nitems; i++)
        {
                tmp = (INTALIGN(*(int32 *) srcptr));
                memmove(destptr, srcptr, tmp);
@@ -1856,199 +1537,3 @@ array_read(char *destptr, int eltsize, int nitems, char *srcptr)
        }
        return inc;
 }
-
-static void
-_LOArrayRange(int *st,
-                         int *endp,
-                         int bsize,
-                         int srcfd,
-                         int destfd,
-                         ArrayType *array,
-                         int isSrcLO,
-                         bool *isNull)
-{
-       int                     n,
-                          *dim,
-                               st_pos,
-                               prod[MAXDIM];
-       int                     span[MAXDIM],
-                               dist[MAXDIM],
-                               indx[MAXDIM];
-       int                     i,
-                               j,
-                               inc,
-                               tmp,
-                          *lb,
-                               offset;
-
-       n = ARR_NDIM(array);
-       dim = ARR_DIMS(array);
-       lb = ARR_LBOUND(array);
-       for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
-
-       mda_get_prod(n, dim, prod);
-       st_pos = tuple2linear(n, st, prod);
-       offset = st_pos * bsize;
-       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(srcfd),
-                                                         Int32GetDatum(offset),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-               return;
-       mda_get_range(n, span, st, endp);
-       mda_get_offset_values(n, dist, prod, span);
-       for (i = 0; i < n; indx[i++] = 0);
-       for (i = n - 1, inc = bsize; i >= 0; inc *= span[i--])
-               if (dist[i])
-                       break;
-       j = n - 1;
-       do
-       {
-               offset += (dist[j] * bsize);
-               if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(srcfd),
-                                                         Int32GetDatum(offset),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-                       return;
-               tmp = _LOtransfer((char **) &srcfd, inc, 1, (char **) &destfd, isSrcLO, 1);
-               if (tmp < inc)
-                       return;
-               offset += inc;
-       } while ((j = next_tuple(i + 1, indx, span)) != -1);
-}
-
-
-static void
-_ReadArray(int *st,
-                  int *endp,
-                  int bsize,
-                  int srcfd,
-                  int destfd,
-                  ArrayType *array,
-                  int isDestLO,
-                  bool *isNull)
-{
-       int                     n,
-                          *dim,
-                               st_pos,
-                               prod[MAXDIM];
-       int                     span[MAXDIM],
-                               dist[MAXDIM],
-                               indx[MAXDIM];
-       int                     i,
-                               j,
-                               inc,
-                               tmp,
-                          *lb,
-                               offset;
-
-       n = ARR_NDIM(array);
-       dim = ARR_DIMS(array);
-       lb = ARR_LBOUND(array);
-       for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
-
-       mda_get_prod(n, dim, prod);
-       st_pos = tuple2linear(n, st, prod);
-       offset = st_pos * bsize;
-       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(srcfd),
-                                                         Int32GetDatum(offset),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-               return;
-       mda_get_range(n, span, st, endp);
-       mda_get_offset_values(n, dist, prod, span);
-       for (i = 0; i < n; indx[i++] = 0);
-       for (i = n - 1, inc = bsize; i >= 0; inc *= span[i--])
-               if (dist[i])
-                       break;
-       j = n - 1;
-       do
-       {
-               offset += (dist[j] * bsize);
-               if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(srcfd),
-                                                         Int32GetDatum(offset),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-                       return;
-               tmp = _LOtransfer((char **) &destfd, inc, 1, (char **) &srcfd, 1, isDestLO);
-               if (tmp < inc)
-                       return;
-               offset += inc;
-       } while ((j = next_tuple(i + 1, indx, span)) != -1);
-}
-
-
-int
-_LOtransfer(char **destfd,
-                       int size,
-                       int nitems,
-                       char **srcfd,
-                       int isSrcLO,
-                       int isDestLO)
-{
-#define MAX_READ (512 * 1024)
-#if !defined(min)
-#define min(a, b) (a < b ? a : b)
-#endif
-       struct varlena *v = NULL;
-       int                     tmp,
-                               inc,
-                               resid;
-
-       inc = nitems * size;
-       if (isSrcLO && isDestLO && inc > 0)
-               for (tmp = 0, resid = inc;
-                        resid > 0 && (inc = min(resid, MAX_READ)) > 0; resid -= inc)
-               {
-#ifdef LOARRAY
-                       v = (struct varlena *)
-                               DatumGetPointer(DirectFunctionCall2(loread,
-                                                               Int32GetDatum((int32) *srcfd),
-                                                               Int32GetDatum(inc)));
-                       if (VARSIZE(v) - VARHDRSZ < inc)
-                       {
-                               pfree(v);
-                               return -1;
-                       }
-                       tmp += DatumGetInt32(DirectFunctionCall2(lowrite,
-                                                                Int32GetDatum((int32) *destfd),
-                                                                PointerGetDatum(v)));
-#endif
-                       pfree(v);
-
-               }
-       else if (!isSrcLO && isDestLO)
-       {
-               tmp = lo_write((int) *destfd, *srcfd, inc);
-               *srcfd = *srcfd + tmp;
-       }
-       else if (isSrcLO && !isDestLO)
-       {
-               tmp = lo_read((int) *srcfd, *destfd, inc);
-               *destfd = *destfd + tmp;
-       }
-       else
-       {
-               memmove(*destfd, *srcfd, inc);
-               tmp = inc;
-               *srcfd += inc;
-               *destfd += inc;
-       }
-       return tmp;
-#undef MAX_READ
-}
-
-char *
-_array_newLO(int *fd, int flag)
-{
-       char       *p;
-       char            saveName[NAME_LEN];
-
-       p = (char *) palloc(NAME_LEN);
-       sprintf(p, "/Arry.%u", newoid());
-       strcpy(saveName, p);
-#ifdef LOARRAY
-       if ((*fd = LOcreat(saveName, 0600, flag)) < 0)
-               elog(ERROR, "Large object create failed");
-#endif
-       return p;
-}
index 3293cecc96060a9b4c0191b325d3b4bdf670bab9..d103d8b885bbe89cb672b8f397ba527970071b2a 100644 (file)
@@ -8,61 +8,62 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayutils.c,v 1.10 2000/01/26 05:57:12 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayutils.c,v 1.11 2000/07/22 03:34:43 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 
-#define WEAK_C_OPTIMIZER
-
 #include "postgres.h"
+
 #include "utils/array.h"
+
+
+/* Convert subscript list into linear element number (from 0) */
 int
-GetOffset(int n, int *dim, int *lb, int *indx)
+ArrayGetOffset(int n, int *dim, int *lb, int *indx)
 {
        int                     i,
-                               scale,
-                               offset;
+                               scale = 1,
+                               offset = 0;
 
-       for (i = n - 1, scale = 1, offset = 0; i >= 0; scale *= dim[i--])
+       for (i = n - 1; i >= 0; i--)
+       {
                offset += (indx[i] - lb[i]) * scale;
+               scale *= dim[i];
+       }
        return offset;
 }
 
+/* Same, but subscripts are assumed 0-based, and use a scale array
+ * instead of raw dimension data (see mda_get_prod to create scale array)
+ */
 int
-getNitems(int n, int *a)
+ArrayGetOffset0(int n, int *tup, int *scale)
 {
        int                     i,
-                               ret;
+                               lin = 0;
 
-       for (i = 0, ret = 1; i < n; ret *= a[i++]);
-       if (n == 0)
-               ret = 0;
-       return ret;
+       for (i = 0; i < n; i++)
+               lin += tup[i] * scale[i];
+       return lin;
 }
 
+/* Convert array dimensions into number of elements */
 int
-compute_size(int *st, int *endp, int n, int base)
+ArrayGetNItems(int n, int *a)
 {
        int                     i,
                                ret;
 
-       for (i = 0, ret = base; i < n; i++)
-               ret *= (endp[i] - st[i] + 1);
+       if (n <= 0)
+               return 0;
+       ret = 1;
+       for (i = 0; i < n; i++)
+               ret *= a[i];
        return ret;
 }
 
-void
-mda_get_offset_values(int n, int *dist, int *PC, int *span)
-{
-       int                     i,
-                               j;
-
-       for (j = n - 2, dist[n - 1] = 0; j >= 0; j--)
-               for (i = j + 1, dist[j] = PC[j] - 1; i < n;
-                        dist[j] -= (span[i] - 1) * PC[i], i++);
-}
-
+/* Compute ranges (sub-array dimensions) for an array slice */
 void
 mda_get_range(int n, int *span, int *st, int *endp)
 {
@@ -72,56 +73,59 @@ mda_get_range(int n, int *span, int *st, int *endp)
                span[i] = endp[i] - st[i] + 1;
 }
 
+/* Compute products of array dimensions, ie, scale factors for subscripts */
 void
-mda_get_prod(int n, int *range, int *P)
+mda_get_prod(int n, int *range, int *prod)
 {
        int                     i;
 
-       for (i = n - 2, P[n - 1] = 1; i >= 0; i--)
-               P[i] = P[i + 1] * range[i + 1];
-}
-
-int
-tuple2linear(int n, int *tup, int *scale)
-{
-       int                     i,
-                               lin;
-
-       for (i = lin = 0; i < n; i++)
-               lin += tup[i] * scale[i];
-       return lin;
+       prod[n - 1] = 1;
+       for (i = n - 2; i >= 0; i--)
+               prod[i] = prod[i + 1] * range[i + 1];
 }
 
+/* From products of whole-array dimensions and spans of a sub-array,
+ * compute offset distances needed to step through subarray within array
+ */
 void
-array2chunk_coord(int n, int *C, int *a_coord, int *c_coord)
+mda_get_offset_values(int n, int *dist, int *prod, int *span)
 {
-       int                     i;
+       int                     i,
+                               j;
 
-       for (i = 0; i < n; i++)
-               c_coord[i] = a_coord[i] / C[i];
+       dist[n - 1] = 0;
+       for (j = n - 2; j >= 0; j--)
+       {
+               dist[j] = prod[j] - 1;
+               for (i = j + 1; i < n; i++)
+                       dist[j] -= (span[i] - 1) * prod[i];
+       }
 }
 
 /*-----------------------------------------------------------------------------
   generates the tuple that is lexicographically one greater than the current
   n-tuple in "curr", with the restriction that the i-th element of "curr" is
   less than the i-th element of "span".
-  RETURNS      0       if no next tuple exists
-  1   otherwise
-  -----------------------------------------------------------------------------*/
+  Returns -1 if no next tuple exists, else the subscript position (0..n-1)
+  corresponding to the dimension to advance along.
+  -----------------------------------------------------------------------------
+*/
 int
-next_tuple(int n, int *curr, int *span)
+mda_next_tuple(int n, int *curr, int *span)
 {
        int                     i;
 
-       if (!n)
+       if (n <= 0)
                return -1;
+
        curr[n - 1] = (curr[n - 1] + 1) % span[n - 1];
-       for (i = n - 1; i * (!curr[i]); i--)
+       for (i = n - 1; i && curr[i] == 0; i--)
                curr[i - 1] = (curr[i - 1] + 1) % span[i - 1];
 
        if (i)
                return i;
        if (curr[0])
                return 0;
+
        return -1;
 }
diff --git a/src/backend/utils/adt/chunk.c b/src/backend/utils/adt/chunk.c
deleted file mode 100644 (file)
index 5577676..0000000
+++ /dev/null
@@ -1,712 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * chunk.c
- *
- * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/adt/Attic/chunk.c,v 1.28 2000/06/18 22:44:13 tgl Exp $
- *
- *-------------------------------------------------------------------------
- */
-#include <ctype.h>
-#include <sys/types.h>
-#include <fcntl.h>
-
-#include "postgres.h"
-
-#include "catalog/pg_type.h"
-#include "fmgr.h"
-#include "libpq/be-fsstubs.h"
-#include "libpq/libpq-fs.h"
-#include "utils/array.h"
-#include "utils/memutils.h"
-
-#define INFTY 500000000
-#define MANY  10000
-#define MAXPAT 20
-#define quot_ceil(x,y) (((x)+(y)-1)/(y))
-#if !defined(min)
-#define min(x,y)               (((x) < (y))? (x) : (y))
-#endif
-#if !defined(max)
-#define max(x,y)               (((x) > (y))? (x) : (y))
-#endif
-
-static CHUNK_INFO cInfo;
-
-/* non-export function prototypes */
-static int _FindBestChunk(int size, int *dmax, int *dbest, int dim,
-                          int A[MAXPAT][MAXDIM + 1], int N);
-static int     get_next(int *d, int k, int C, int *dmax);
-static void initialize_info(CHUNK_INFO *A, int ndim, int *dim, int *chunk);
-
-#ifdef LOARRAY
-static void _ConvertToChunkFile(int n, int baseSize, int *dim, int *C,
-                                       int srcfd, int destfd);
-static void read_chunk(int *chunk_no, int *C, char *a_chunk, int srcfd,
-                  int n, int baseSize, int *PX, int *dist);
-static int     write_chunk(struct varlena * a_chunk, int ofile);
-static int     seek_and_read(int pos, int size, char *buff, int fp, int from);
-
-#endif
-static int GetChunkSize(FILE *fd, int ndim, int dim[MAXDIM], int baseSize,
-                        int d[MAXDIM]);
-
-/*------------------------------------------------------------------------
- * _ChunkArray ---
- *        converts an input array to chunked format using the information
- *        provided by the access pattern.
- * Results:
- *        creates a new file that stores the chunked array and returns
- *        information about the chunked file
- *-----------------------------------------------------------------------
- */
-char *
-_ChunkArray(int fd,
-                       FILE *afd,
-                       int ndim,
-                       int *dim,
-                       int baseSize,
-                       int *nbytes,
-                       char *chunkfile)
-{
-#ifdef LOARRAY
-       int                     cfd = 0;
-
-#endif
-       int                     chunk[MAXDIM],
-                               csize;
-       bool            reorgFlag;
-
-       if (chunkfile == NULL)
-               reorgFlag = true;
-       else
-               reorgFlag = false;
-
-#ifdef LOARRAY
-       if (reorgFlag)
-               /* create new LO for chunked file */
-               chunkfile = _array_newLO(&cfd, fileFlag);
-       else
-               cfd = LOopen(chunkfile, O_RDONLY);
-       if (cfd < 0)
-               elog(ERROR, "Unable to open chunk file");
-#endif
-
-       strcpy(cInfo.lo_name, chunkfile);
-
-       /* find chunk size */
-       csize = GetChunkSize(afd, ndim, dim, baseSize, chunk);
-
-#ifdef LOARRAY
-       if (reorgFlag)
-               /* copy data from input file to chunked file */
-               _ConvertToChunkFile(ndim, baseSize, dim, chunk, fd, cfd);
-#endif
-
-       initialize_info(&cInfo, ndim, dim, chunk);
-       *nbytes = sizeof(CHUNK_INFO);
-       return (char *) &cInfo;
-}
-
-/*--------------------------------------------------------------------------
- * GetChunkSize
- *               given an access pattern and array dimensionality etc, this program
- *             returns the dimensions of the chunk in "d"
- *-----------------------------------------------------------------------
- */
-static int
-GetChunkSize(FILE *fd,
-                        int ndim,
-                        int dim[MAXDIM],
-                        int baseSize,
-                        int d[MAXDIM])
-{
-       int                     N,
-                               i,
-                               j,
-                               csize;
-       int                     A[MAXPAT][MAXDIM + 1],
-                               dmax[MAXDIM];
-
-       /*
-        * ----------- read input ------------
-        */
-       fscanf(fd, "%d", &N);
-       if (N > MAXPAT)
-               elog(ERROR, "array_in: too many access pattern elements");
-       for (i = 0; i < N; i++)
-               for (j = 0; j < ndim + 1; j++)
-                       if (fscanf(fd, "%d ", &(A[i][j])) == EOF)
-                               elog(ERROR, "array_in: bad access pattern input");
-
-       /*
-        * estimate chunk size
-        */
-       for (i = 0; i < ndim; i++)
-               for (j = 0, dmax[i] = 1; j < N; j++)
-                       if (dmax[i] < A[j][i])
-                               dmax[i] = A[j][i];
-       csize = BLCKSZ / baseSize;
-
-       _FindBestChunk(csize, dmax, d, ndim, A, N);
-
-       return csize;
-}
-
-/*-------------------------------------------------------------------------
- * _FindBestChunk
- *               This routine does most of the number crunching to compute the
- *               optimal chunk shape.
- * Called by GetChunkSize
- *------------------------------------------------------------------------
- */
-static int
-_FindBestChunk(int size,
-                          int *dmax,
-                          int *dbest,
-                          int dim,
-                          int A[MAXPAT][MAXDIM + 1],
-                          int N)
-{
-       int                     d[MAXDIM];
-       int                     tc,
-                               mintc = INFTY;
-
-       d[0] = 0;
-       mintc = INFTY;
-       while (get_next(d, dim, size, dmax))
-       {
-
-               /*
-                * compute the number of page fetches for a given chunk size (*d)
-                * and access pattern (**A)
-                */
-               int                     i,
-                                       j,
-                                       nc;
-
-               for (i = 0, tc = 0; i < N; i++)
-               {
-                       for (j = 0, nc = 1; j < dim; j++)
-                               nc *= quot_ceil(A[i][j], d[j]);
-                       nc *= A[i][dim];
-                       tc += nc;
-               }
-
-               /*
-                * tc holds the total number of page fetches
-                */
-               if (mintc >= tc)
-               {
-                       mintc = tc;
-                       for (j = 0; j < dim; dbest[j] = d[j], j++)
-                               ;
-               }
-       }
-       return mintc;
-}
-
-/*----------------------------------------------------------------------
- * get_next
- *      Called by _GetBestChunk to get the next tuple in the lexicographic order
- *---------------------------------------------------------------------
- */
-static int
-get_next(int *d, int k, int C, int *dmax)
-{
-       int                     i,
-                               j,
-                               temp;
-
-       if (!d[0])
-       {
-               temp = C;
-               for (j = k - 1; j >= 0; j--)
-               {
-                       d[j] = min(temp, dmax[j]);
-                       temp = max(1, temp / d[j]);
-               }
-               return 1;
-       }
-
-       for (j = 0, temp = 1; j < k; j++)
-               temp *= d[j];
-
-       for (i = k - 1; i >= 0; i--)
-       {
-               temp = temp / d[i];
-               if (((temp * (d[i] + 1)) < C) && (d[i] + 1 <= dmax[i]))
-                       break;
-       }
-       if (i < 0)
-               return 0;
-
-       d[i]++;
-       j = C / temp;
-       d[i] = min(dmax[i], j / (j / d[i]));
-       temp = temp * d[i];
-       temp = C / temp;
-
-       for (j = k - 1; j > i; j--)
-       {
-               d[j] = min(temp, dmax[j]);
-               temp = max(1, temp / d[j]);
-       }
-       return 1;
-}
-
-#ifdef LOARRAY
-static char a_chunk[BLCKSZ + VARHDRSZ]; /* VARHDRSZ since a_chunk is in
-                                                                                * varlena format */
-
-#endif
-
-static void
-initialize_info(CHUNK_INFO *A, int ndim, int *dim, int *chunk)
-{
-       int                     i;
-
-       for (i = 0; i < ndim; i++)
-               A->C[i] = chunk[i];
-}
-
-/*--------------------------------------------------------------------------
- * Procedure reorganize_data():
- *       This procedure reads the input multidimensional array that is organised
- *       in the order specified by array "X" and breaks it up into chunks of
- *       dimensions specified in "C".
- *
- *       This is a very slow process, since reading and writing of LARGE files
- *       may be involved.
- *
- *-------------------------------------------------------------------------
- */
-#ifdef LOARRAY
-static void
-_ConvertToChunkFile(int n,
-                                       int baseSize,
-                                       int *dim,
-                                       int *C,
-                                       int srcfd,
-                                       int destfd)
-{
-       int                     max_chunks[MAXDIM],
-                               chunk_no[MAXDIM];
-       int                     PX[MAXDIM],
-                               dist[MAXDIM];
-       int                     csize = 1,
-                               i,
-                               temp;
-
-       for (i = 0; i < n; chunk_no[i++] = 0)
-       {
-               max_chunks[i] = dim[i] / C[i];
-               csize *= C[i];
-       }
-       csize *= baseSize;
-       temp = csize + VARHDRSZ;
-       memmove(a_chunk, &temp, VARHDRSZ);
-
-       mda_get_prod(n, dim, PX);
-       mda_get_offset_values(n, dist, PX, C);
-       for (i = 0; i < n; dist[i] *= baseSize, i++)
-               ;
-       do
-       {
-               read_chunk(chunk_no, C, &(a_chunk[VARHDRSZ]), srcfd, n, baseSize, PX, dist);
-               write_chunk((struct varlena *) a_chunk, destfd);
-       } while (next_tuple(n, chunk_no, max_chunks) != -1);
-}
-
-/*--------------------------------------------------------------------------
- * read_chunk
- *       reads a chunk from the input files into a_chunk, the position of the
- *       chunk is specified by chunk_no
- *--------------------------------------------------------------------------
- */
-static void
-read_chunk(int *chunk_no,
-                  int *C,
-                  char *a_chunk,
-                  int srcfd,
-                  int n,
-                  int baseSize,
-                  int *PX,
-                  int *dist)
-{
-       int                     i,
-                               j,
-                               cp,
-                               unit_transfer;
-       int                     start_pos,
-                               pos[MAXDIM];
-       int                     indx[MAXDIM];
-       int                     fpOff;
-
-       for (i = start_pos = 0; i < n; i++)
-       {
-               pos[i] = chunk_no[i] * C[i];
-               start_pos += pos[i] * PX[i];
-       }
-       start_pos *= baseSize;
-
-       /* Read a block of dimesion C starting at co-ordinates pos */
-       unit_transfer = C[n - 1] * baseSize;
-
-       for (i = 0; i < n; indx[i++] = 0)
-               ;
-       fpOff = start_pos;
-       seek_and_read(fpOff, unit_transfer, a_chunk, srcfd, SEEK_SET);
-       fpOff += unit_transfer;
-       cp = unit_transfer;
-
-       while ((j = next_tuple(n - 1, indx, C)) != -1)
-       {
-               fpOff += dist[j];
-               seek_and_read(fpOff, unit_transfer, &(a_chunk[cp]), srcfd, SEEK_SET);
-               cp += unit_transfer;
-               fpOff += unit_transfer;
-       }
-}
-
-/*--------------------------------------------------------------------------
- * write_chunk()
- *       writes a chunk of size csize into the output file
- *--------------------------------------------------------------------------
- */
-static int
-write_chunk(struct varlena * a_chunk, int ofile)
-{
-       int                     got_n = 0;
-
-#ifdef LOARRAY
-       got_n = DatumGetInt32(DirectFunctionCall2(lowrite,
-                                                                                         Int32GetDatum(ofile),
-                                                                                         PointerGetDatum(a_chunk)));
-#endif
-       return got_n;
-}
-
-/*--------------------------------------------------------------------------
- * seek_and_read()
- *       seeks to the asked location in the input file and reads the
- *       appropriate number of blocks
- *      Called By: read_chunk()
- *--------------------------------------------------------------------------
- */
-static int
-seek_and_read(int pos, int size, char *buff, int fp, int from)
-{
-       struct varlena *v;
-
-       /* Assuming only one file */
-       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                                                 Int32GetDatum(fp),
-                                                                                 Int32GetDatum(pos),
-                                                                                 Int32GetDatum(from))) < 0)
-               elog(ERROR, "File seek error");
-#ifdef LOARRAY
-       v = (struct varlena *)
-               DatumGetPointer(DirectFunctionCall2(loread,
-                                                                                       Int32GetDatum(fp),
-                                                                                       Int32GetDatum(size)));
-#endif
-       if (VARSIZE(v) - VARHDRSZ < size)
-               elog(ERROR, "File read error");
-       memmove(buff, VARDATA(v), size);
-       pfree(v);
-       return 1;
-
-}
-
-#endif  /* LOARRAY */
-
-/*----------------------------------------------------------------------------
- * _ReadChunkArray
- *               returns the subarray specified bu the range indices "st" and "endp"
- *               from the chunked array stored in file "fp"
- *---------------------------------------------------------------------------
- */
-int
-_ReadChunkArray(int *st,
-                               int *endp,
-                               int bsize,
-                               int fp,
-                               char *destfp,
-                               ArrayType *array,
-                               int isDestLO,
-                               bool *isNull)
-{
-       int                     i,
-                               j,
-                               jj;
-       int                     n,
-                               temp,
-                               words_read;
-       int                     chunk_span[MAXDIM],
-                               chunk_off[MAXDIM];
-       int                     chunk_st[MAXDIM],
-                               chunk_end[MAXDIM];
-       int                     block_seek;
-
-       int                     bptr,
-                          *C,
-                               csize,
-                          *dim,
-                          *lb;
-       int                     range_st[MAXDIM],
-                               range_end[MAXDIM],
-                               range[MAXDIM],
-                               array_span[MAXDIM];
-       int                     PA[MAXDIM],
-                               PCHUNK[MAXDIM],
-                               PC[MAXDIM];
-       int                     to_read;
-       int                     cdist[MAXDIM],
-                               adist[MAXDIM];
-       int                     dist[MAXDIM],
-                               temp_seek;
-
-       int                     srcOff;                 /* Needed since LO don't understand
-                                                                * SEEK_CUR */
-       char       *baseDestFp = (char *) destfp;
-
-       CHUNK_INFO *A = (CHUNK_INFO *) ARR_DATA_PTR(array);
-
-       n = ARR_NDIM(array);
-       dim = ARR_DIMS(array);
-       lb = ARR_LBOUND(array);
-       C = A->C;
-
-       csize = C[n - 1];
-       PC[n - 1] = 1;
-       temp = dim[n - 1] / C[n - 1];
-       for (i = n - 2; i >= 0; i--)
-       {
-               PC[i] = PC[i + 1] * temp;
-               temp = dim[i] / C[i];
-               csize *= C[i];
-       }
-
-       for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++)
-               ;
-       mda_get_prod(n, C, PCHUNK);
-       mda_get_range(n, array_span, st, endp);
-       mda_get_prod(n, array_span, PA);
-
-       array2chunk_coord(n, C, st, chunk_st);
-       array2chunk_coord(n, C, endp, chunk_end);
-       mda_get_range(n, chunk_span, chunk_st, chunk_end);
-       mda_get_offset_values(n, dist, PC, chunk_span);
-
-       for (i = 0; i < n; i++)
-       {
-               range_st[i] = st[i];
-               range_end[i] = min(chunk_st[i] * C[i] + C[i] - 1, endp[i]);
-       }
-
-       for (i = j = 0; i < n; i++)
-               j += chunk_st[i] * PC[i];
-       temp_seek = srcOff = j * csize * bsize;
-       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                                                 Int32GetDatum(fp),
-                                                                                 Int32GetDatum(srcOff),
-                                                                                 Int32GetDatum(SEEK_SET))) < 0)
-               RETURN_NULL(int);
-
-       jj = n - 1;
-       for (i = 0; i < n; chunk_off[i++] = 0)
-               ;
-       words_read = 0;
-       temp_seek = 0;
-       do
-       {
-               /* Write chunk (chunk_st) to output buffer */
-               mda_get_range(n, array_span, range_st, range_end);
-               mda_get_offset_values(n, adist, PA, array_span);
-               mda_get_offset_values(n, cdist, PCHUNK, array_span);
-               for (i = 0; i < n; range[i] = range_st[i] - st[i], i++);
-               bptr = tuple2linear(n, range, PA);
-               for (i = 0; i < n; range[i++] = 0);
-               j = n - 1;
-               bptr *= bsize;
-               if (isDestLO)
-               {
-                       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum((int32) destfp),
-                                                         Int32GetDatum(bptr),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-                               RETURN_NULL(int);
-               }
-               else
-                       destfp = baseDestFp + bptr;
-               for (i = 0, block_seek = 0; i < n; i++)
-                       block_seek += (range_st[i] - (chunk_st[i] + chunk_off[i])
-                                                  * C[i]) * PCHUNK[i];
-               if (dist[jj] + block_seek + temp_seek)
-               {
-                       temp = (dist[jj] * csize + block_seek + temp_seek) * bsize;
-                       srcOff += temp;
-                       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                         Int32GetDatum(fp),
-                                                         Int32GetDatum(srcOff),
-                                                         Int32GetDatum(SEEK_SET))) < 0)
-                               RETURN_NULL(int);
-               }
-               for (i = n - 1, to_read = bsize; i >= 0;
-                        to_read *= min(C[i], array_span[i]), i--)
-                       if (cdist[i] || adist[i])
-                               break;
-               do
-               {
-                       if (cdist[j])
-                       {
-                               srcOff += (cdist[j] * bsize);
-                               if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                                 Int32GetDatum(fp),
-                                                                 Int32GetDatum(srcOff),
-                                                                 Int32GetDatum(SEEK_SET))) < 0)
-                                       RETURN_NULL(int);
-                       }
-                       block_seek += cdist[j];
-                       bptr += adist[j] * bsize;
-                       if (isDestLO)
-                       {
-                               if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                                                 Int32GetDatum((int32) destfp),
-                                                                 Int32GetDatum(bptr),
-                                                                 Int32GetDatum(SEEK_SET))) < 0)
-                                       RETURN_NULL(int);
-                       }
-                       else
-                               destfp = baseDestFp + bptr;
-                       temp = _LOtransfer((char **) &destfp, to_read, 1, (char **) &fp, 1, isDestLO);
-                       if (temp < to_read)
-                               RETURN_NULL(int);
-                       srcOff += to_read;
-                       words_read += to_read;
-                       bptr += to_read;
-                       block_seek += (to_read / bsize);
-
-                       /*
-                        * compute next tuple in *range
-                        */
-                       {
-                               int                     x;
-
-                               if (!(i + 1))
-                                       j = -1;
-                               else
-                               {
-                                       range[i] = (range[i] + 1) % array_span[i];
-                                       for (x = i; x * (!range[x]); x--)
-                                               range[x - 1] = (range[x - 1] + 1) % array_span[x - 1];
-                                       if (x)
-                                               j = x;
-                                       else
-                                       {
-                                               if (range[0])
-                                                       j = 0;
-                                               else
-                                                       j = -1;
-                                       }
-                               }
-                       }
-
-                       /*
-                        * end of compute next tuple -- j is set to -1 if tuple
-                        * generation is over
-                        */
-               } while (j != -1);
-
-               block_seek = csize - block_seek;
-               temp_seek = block_seek;
-               jj = next_tuple(n, chunk_off, chunk_span);
-               if (jj == -1)
-                       break;
-               range_st[jj] = (chunk_st[jj] + chunk_off[jj]) * C[jj];
-               range_end[jj] = min(range_st[jj] + C[jj] - 1, endp[jj]);
-
-               for (i = jj + 1; i < n; i++)
-               {
-                       range_st[i] = st[i];
-                       range_end[i] = min((chunk_st[i] + chunk_off[i]) * C[i] + C[i] - 1, endp[i]);
-               }
-       } while (jj != -1);
-       return words_read;
-}
-
-/*------------------------------------------------------------------------
- * _ReadChunkArray1El
- *              returns one element of the chunked array as specified by the index "st"
- *              the chunked file descriptor is "fp"
- *-------------------------------------------------------------------------
- */
-struct varlena *
-_ReadChunkArray1El(int *st,
-                                  int bsize,
-                                  int fp,
-                                  ArrayType *array,
-                                  bool *isNull)
-{
-       int                     i,
-                               j,
-                               n,
-                               temp,
-                               srcOff;
-       int                     chunk_st[MAXDIM];
-
-       int                *C,
-                               csize,
-                          *dim,
-                          *lb;
-       int                     PCHUNK[MAXDIM],
-                               PC[MAXDIM];
-
-       CHUNK_INFO *A = (CHUNK_INFO *) ARR_DATA_PTR(array);
-
-       n = ARR_NDIM(array);
-       lb = ARR_LBOUND(array);
-       C = A->C;
-       dim = ARR_DIMS(array);
-
-       csize = C[n - 1];
-       PC[n - 1] = 1;
-       temp = dim[n - 1] / C[n - 1];
-       for (i = n - 2; i >= 0; i--)
-       {
-               PC[i] = PC[i + 1] * temp;
-               temp = dim[i] / C[i];
-               csize *= C[i];
-       }
-
-       for (i = 0; i < n; st[i] -= lb[i], i++);
-       mda_get_prod(n, C, PCHUNK);
-
-       array2chunk_coord(n, C, st, chunk_st);
-
-       for (i = j = 0; i < n; i++)
-               j += chunk_st[i] * PC[i];
-       srcOff = j * csize;
-
-       for (i = 0; i < n; i++)
-               srcOff += (st[i] - chunk_st[i] * C[i]) * PCHUNK[i];
-
-       srcOff *= bsize;
-       if (DatumGetInt32(DirectFunctionCall3(lo_lseek,
-                                         Int32GetDatum(fp),
-                                         Int32GetDatum(srcOff),
-                                         Int32GetDatum(SEEK_SET))) < 0)
-               RETURN_NULL(struct varlena *);
-#ifdef LOARRAY
-       return (struct varlena *)
-               DatumGetPointer(DirectFunctionCall2(loread,
-                                                                                       Int32GetDatum(fp),
-                                                                                       Int32GetDatum(bsize)));
-#endif
-       return (struct varlena *) 0;
-}
index c77c3c40943e20aa9e362f47bee2aa1099100906..d14f9287e24c39e2fc00d70f0f18acfd2095fae3 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: pg_type.h,v 1.92 2000/07/07 19:24:41 petere Exp $
+ * $Id: pg_type.h,v 1.93 2000/07/22 03:34:28 tgl Exp $
  *
  * NOTES
  *       the genbki.sh script reads this file and generates .bki
@@ -76,11 +76,11 @@ CATALOG(pg_type) BOOTSTRAP
         * If typelem is not 0 then it identifies another row in pg_type.
         * The current type can then be subscripted like an array yielding
         * values of type typelem. A non-zero typelem does not guarantee
-        * this type to be an array type; ordinary fixed-length types can
-        * also be subscripted (e.g., oidvector). Variable-length types
-        * can *not* be turned into pseudo-arrays like that. Hence, the
-        * way to determine whether a type is an array type is typelem !=
-        * 0 and typlen < 0.
+        * this type to be a "real" array type; some ordinary fixed-length
+        * types can also be subscripted (e.g., oidvector). Variable-length
+        * types can *not* be turned into pseudo-arrays like that. Hence,
+        * the way to determine whether a type is a "true" array type is
+        * typelem != 0 and typlen < 0.
         */
        Oid                     typelem;
        regproc         typinput;
@@ -282,7 +282,7 @@ DESCR("filename used in system tables");
 DATA(insert OID = 628 (  line     PGUID 32  48 f b t \054 0 701 line_in line_out line_in line_out d p _null_ ));
 DESCR("geometric line '(pt1,pt2)'");
 #define LINEOID                        628
-DATA(insert OID = 629 (  _line    PGUID  -1 -1 f b t \054 0 628 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 629 (  _line    PGUID  -1 -1 f b t \054 0 628 array_in array_out array_in array_out d x _null_ ));
 DESCR("");
 
 /* OIDS 700 - 799 */
@@ -309,11 +309,11 @@ DESCR("");
 DATA(insert OID = 718 (  circle    PGUID  24 47 f b t \054 0   0 circle_in circle_out circle_in circle_out d p _null_ ));
 DESCR("geometric circle '(center,radius)'");
 #define CIRCLEOID              718
-DATA(insert OID = 719 (  _circle   PGUID  -1 -1 f b t \054 0  718 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 719 (  _circle   PGUID  -1 -1 f b t \054 0  718 array_in array_out array_in array_out d x _null_ ));
 DATA(insert OID = 790 (  money    PGUID   4 24 f b t \054 0    0 cash_in cash_out cash_in cash_out i p _null_ ));
 DESCR("$d,ddd.cc, money");
 #define CASHOID 790
-DATA(insert OID = 791 (  _money    PGUID  -1 -1 f b t \054 0  790 array_in array_out array_in array_out i p _null_ ));
+DATA(insert OID = 791 (  _money    PGUID  -1 -1 f b t \054 0  790 array_in array_out array_in array_out i x _null_ ));
 
 /* OIDS 800 - 899 */
 DATA(insert OID = 829 ( macaddr    PGUID  6 -1 f b t \054 0 0 macaddr_in macaddr_out macaddr_in macaddr_out i p _null_ ));
@@ -328,34 +328,34 @@ DESCR("network IP address/netmask, network address");
 /* OIDS 900 - 999 */
 
 /* OIDS 1000 - 1099 */
-DATA(insert OID = 1000 (  _bool                 PGUID -1  -1 f b t \054 0      16 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1001 (  _bytea        PGUID -1  -1 f b t \054 0      17 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1002 (  _char                 PGUID -1  -1 f b t \054 0      18 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1003 (  _name                 PGUID -1  -1 f b t \054 0      19 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1005 (  _int2                 PGUID -1  -1 f b t \054 0      21 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1006 (  _int2vector PGUID -1 -1 f b t \054 0 22 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1007 (  _int4                 PGUID -1  -1 f b t \054 0      23 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1008 (  _regproc      PGUID -1  -1 f b t \054 0      24 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1009 (  _text                 PGUID -1  -1 f b t \054 0      25 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1028 (  _oid          PGUID -1  -1 f b t \054 0      26 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1010 (  _tid          PGUID -1  -1 f b t \054 0      27 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1011 (  _xid          PGUID -1  -1 f b t \054 0      28 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1012 (  _cid          PGUID -1  -1 f b t \054 0      29 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1013 (  _oidvector PGUID -1  -1 f b t \054 0 30 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1014 (  _bpchar       PGUID -1  -1 f b t \054 0 1042 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1015 (  _varchar      PGUID -1  -1 f b t \054 0 1043 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1016 (  _int8                 PGUID -1  -1 f b t \054 0      20 array_in array_out array_in array_out d p _null_ ));
-DATA(insert OID = 1017 (  _point        PGUID -1  -1 f b t \054 0 600 array_in array_out array_in array_out d p _null_ ));
-DATA(insert OID = 1018 (  _lseg                 PGUID -1  -1 f b t \054 0 601 array_in array_out array_in array_out d p _null_ ));
-DATA(insert OID = 1019 (  _path                 PGUID -1  -1 f b t \054 0 602 array_in array_out array_in array_out d p _null_ ));
-DATA(insert OID = 1020 (  _box          PGUID -1  -1 f b t \073 0 603 array_in array_out array_in array_out d p _null_ ));
-DATA(insert OID = 1021 (  _float4       PGUID -1  -1 f b t \054 0 700 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1022 (  _float8       PGUID -1  -1 f b t \054 0 701 array_in array_out array_in array_out d p _null_ ));
-DATA(insert OID = 1023 (  _abstime      PGUID -1  -1 f b t \054 0 702 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1024 (  _reltime      PGUID -1  -1 f b t \054 0 703 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1025 (  _tinterval PGUID -1  -1 f b t \054 0 704 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1026 (  _filename  PGUID -1  -1 f b t \054 0 605 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1027 (  _polygon      PGUID -1  -1 f b t \054 0 604 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 1000 (  _bool                 PGUID -1  -1 f b t \054 0      16 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1001 (  _bytea        PGUID -1  -1 f b t \054 0      17 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1002 (  _char                 PGUID -1  -1 f b t \054 0      18 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1003 (  _name                 PGUID -1  -1 f b t \054 0      19 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1005 (  _int2                 PGUID -1  -1 f b t \054 0      21 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1006 (  _int2vector PGUID -1 -1 f b t \054 0 22 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1007 (  _int4                 PGUID -1  -1 f b t \054 0      23 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1008 (  _regproc      PGUID -1  -1 f b t \054 0      24 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1009 (  _text                 PGUID -1  -1 f b t \054 0      25 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1028 (  _oid          PGUID -1  -1 f b t \054 0      26 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1010 (  _tid          PGUID -1  -1 f b t \054 0      27 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1011 (  _xid          PGUID -1  -1 f b t \054 0      28 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1012 (  _cid          PGUID -1  -1 f b t \054 0      29 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1013 (  _oidvector PGUID -1  -1 f b t \054 0 30 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1014 (  _bpchar       PGUID -1  -1 f b t \054 0 1042 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1015 (  _varchar      PGUID -1  -1 f b t \054 0 1043 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1016 (  _int8                 PGUID -1  -1 f b t \054 0      20 array_in array_out array_in array_out d x _null_ ));
+DATA(insert OID = 1017 (  _point        PGUID -1  -1 f b t \054 0 600 array_in array_out array_in array_out d x _null_ ));
+DATA(insert OID = 1018 (  _lseg                 PGUID -1  -1 f b t \054 0 601 array_in array_out array_in array_out d x _null_ ));
+DATA(insert OID = 1019 (  _path                 PGUID -1  -1 f b t \054 0 602 array_in array_out array_in array_out d x _null_ ));
+DATA(insert OID = 1020 (  _box          PGUID -1  -1 f b t \073 0 603 array_in array_out array_in array_out d x _null_ ));
+DATA(insert OID = 1021 (  _float4       PGUID -1  -1 f b t \054 0 700 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1022 (  _float8       PGUID -1  -1 f b t \054 0 701 array_in array_out array_in array_out d x _null_ ));
+DATA(insert OID = 1023 (  _abstime      PGUID -1  -1 f b t \054 0 702 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1024 (  _reltime      PGUID -1  -1 f b t \054 0 703 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1025 (  _tinterval PGUID -1  -1 f b t \054 0 704 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1026 (  _filename  PGUID -1  -1 f b t \054 0 605 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1027 (  _polygon      PGUID -1  -1 f b t \054 0 604 array_in array_out array_in array_out d x _null_ ));
 /*
  *     Note: the size of aclitem needs to match sizeof(AclItem) in acl.h.
  *     Thanks to some padding, this will be 8 on all platforms.
@@ -364,10 +364,10 @@ DATA(insert OID = 1027 (  _polygon         PGUID -1  -1 f b t \054 0 604 array_in array
 #define ACLITEMSIZE 8
 DATA(insert OID = 1033 (  aclitem       PGUID 8   -1 f b t \054 0 0 aclitemin aclitemout aclitemin aclitemout i p _null_ ));
 DESCR("access control list");
-DATA(insert OID = 1034 (  _aclitem      PGUID -1 -1 f b t \054 0 1033 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1040 (  _macaddr      PGUID -1 -1 f b t \054 0  829 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1041 (  _inet    PGUID -1 -1 f b t \054 0  869 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 651  (  _cidr    PGUID -1 -1 f b t \054 0  650 array_in array_out array_in array_out i p _null_ ));
+DATA(insert OID = 1034 (  _aclitem      PGUID -1 -1 f b t \054 0 1033 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1040 (  _macaddr      PGUID -1 -1 f b t \054 0  829 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1041 (  _inet    PGUID -1 -1 f b t \054 0  869 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 651  (  _cidr    PGUID -1 -1 f b t \054 0  650 array_in array_out array_in array_out i x _null_ ));
 DATA(insert OID = 1042 ( bpchar                 PGUID -1  -1 f b t \054 0      0 bpcharin bpcharout bpcharin bpcharout i p _null_ ));
 DESCR("char(length), blank-padded string, fixed storage length");
 #define BPCHAROID              1042
@@ -383,33 +383,33 @@ DESCR("hh:mm:ss, ANSI SQL time");
 #define TIMEOID                        1083
 
 /* OIDS 1100 - 1199 */
-DATA(insert OID = 1182 ( _date          PGUID  -1 -1 f b t \054 0      1082 array_in array_out array_in array_out i p _null_ ));
-DATA(insert OID = 1183 ( _time          PGUID  -1 -1 f b t \054 0      1083 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 1182 ( _date          PGUID  -1 -1 f b t \054 0      1082 array_in array_out array_in array_out i x _null_ ));
+DATA(insert OID = 1183 ( _time          PGUID  -1 -1 f b t \054 0      1083 array_in array_out array_in array_out d x _null_ ));
 DATA(insert OID = 1184 ( timestamp      PGUID  8  47 f b t \054 0      0 timestamp_in timestamp_out timestamp_in timestamp_out d p _null_ ));
 DESCR("date and time");
 #define TIMESTAMPOID   1184
-DATA(insert OID = 1185 ( _timestamp  PGUID     -1 -1 f b t \054 0      1184 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 1185 ( _timestamp  PGUID     -1 -1 f b t \054 0      1184 array_in array_out array_in array_out d x _null_ ));
 DATA(insert OID = 1186 ( interval       PGUID 12  47 f b t \054 0      0 interval_in interval_out interval_in interval_out d p _null_ ));
 DESCR("@ <number> <units>, time interval");
 #define INTERVALOID            1186
-DATA(insert OID = 1187 ( _interval      PGUID  -1 -1 f b t \054 0      1186 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 1187 ( _interval      PGUID  -1 -1 f b t \054 0      1186 array_in array_out array_in array_out d x _null_ ));
 
 /* OIDS 1200 - 1299 */
-DATA(insert OID = 1231 (  _numeric      PGUID -1  -1 f b t \054 0      1700 array_in array_out array_in array_out i p _null_ ));
+DATA(insert OID = 1231 (  _numeric      PGUID -1  -1 f b t \054 0      1700 array_in array_out array_in array_out i x _null_ ));
 DATA(insert OID = 1266 ( timetz                 PGUID 12  22 f b t \054 0      0 timetz_in timetz_out timetz_in timetz_out d p _null_ ));
 DESCR("hh:mm:ss, ANSI SQL time");
 #define TIMETZOID              1266
-DATA(insert OID = 1270 ( _timetz        PGUID  -1 -1 f b t \054 0      1266 array_in array_out array_in array_out d p _null_ ));
+DATA(insert OID = 1270 ( _timetz        PGUID  -1 -1 f b t \054 0      1266 array_in array_out array_in array_out d x _null_ ));
 
 /* OIDS 1500 - 1599 */
 DATA(insert OID = 1560 ( bit            PGUID -1  -1 f b t \054 0      0 zpbit_in zpbit_out zpbit_in zpbit_out i p _null_ ));
 DESCR("fixed-length bit string");
 #define ZPBITOID        1560
-DATA(insert OID = 1561 ( _bit           PGUID  -1 -1 f b t \054 0      1560 array_in array_out array_in array_out i p _null_ ));
+DATA(insert OID = 1561 ( _bit           PGUID  -1 -1 f b t \054 0      1560 array_in array_out array_in array_out i x _null_ ));
 DATA(insert OID = 1562 ( varbit                 PGUID -1  -1 f b t \054 0      0 varbit_in varbit_out varbit_in varbit_out i p _null_ ));
 DESCR("fixed-length bit string");
 #define VARBITOID        1562
-DATA(insert OID = 1563 ( _varbit        PGUID  -1 -1 f b t \054 0      1562 array_in array_out array_in array_out i p _null_ ));
+DATA(insert OID = 1563 ( _varbit        PGUID  -1 -1 f b t \054 0      1562 array_in array_out array_in array_out i x _null_ ));
 
 /* OIDS 1600 - 1699 */
 DATA(insert OID = 1625 ( lztext                 PGUID -1  -1 f b t \054 0      0 lztextin lztextout lztextin lztextout i x _null_ ));
index 4d915e0665fef49eb8172ddd92a3f889364fd309..590e79f06d4e419454544624ee9c8e73e4b3c39c 100644 (file)
@@ -5,18 +5,18 @@
  *       following files:
  *                             utils/adt/arrayfuncs.c
  *                             utils/adt/arrayutils.c
- *                             utils/adt/chunk.c
  *
  *
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: array.h,v 1.26 2000/07/17 03:05:32 tgl Exp $
+ * $Id: array.h,v 1.27 2000/07/22 03:34:35 tgl Exp $
  *
  * NOTES
- *       XXX the data array should be MAXALIGN'd -- notice that the array
- *       allocation code does not allocate the extra space required for this,
- *       even though the array-packing code does the MAXALIGNs.
+ *       XXX the data array should be MAXALIGN'd -- currently we only INTALIGN
+ *       which is NOT good enough for, eg, arrays of Interval.  Changing this
+ *       will break existing user tables so hold off until we have some other
+ *       reason to break user tables (like WAL).
  *
  *-------------------------------------------------------------------------
  */
@@ -34,6 +34,7 @@ typedef struct
        int32           size;                   /* total array size (varlena requirement) */
        int                     ndim;                   /* # of dimensions */
        int                     flags;                  /* implementation flags */
+       /* flags field is currently unused, always zero. */
 } ArrayType;
 
 /*
@@ -46,35 +47,8 @@ typedef struct
 #define PG_RETURN_ARRAYTYPE_P(x)      PG_RETURN_POINTER(x)
 
 /*
- * bitmask of ArrayType flags field:
- * 1st bit - large object flag
- * 2nd bit - chunk flag (array is chunked if set)
- * 3rd,4th,&5th bit - large object type (used only if bit 1 is set)
- */
-#define ARR_LOB_FLAG   (0x1)
-#define ARR_CHK_FLAG   (0x2)
-#define ARR_OBJ_MASK   (0x1c)
-
-#define ARR_SIZE(a)                            (((ArrayType *) a)->size)
-#define ARR_NDIM(a)                            (((ArrayType *) a)->ndim)
-#define ARR_FLAGS(a)                   (((ArrayType *) a)->flags)
-
-#define ARR_IS_LO(a) \
-               (((ArrayType *) a)->flags & ARR_LOB_FLAG)
-#define SET_LO_FLAG(f,a) \
-               (((ArrayType *) a)->flags |= ((f) ? ARR_LOB_FLAG : 0x0))
-
-#define ARR_IS_CHUNKED(a) \
-               (((ArrayType *) a)->flags & ARR_CHK_FLAG)
-#define SET_CHUNK_FLAG(f,a) \
-               (((ArrayType *) a)->flags |= ((f) ? ARR_CHK_FLAG : 0x0))
-
-#define ARR_OBJ_TYPE(a) \
-               ((ARR_FLAGS(a) & ARR_OBJ_MASK) >> 2)
-#define SET_OBJ_TYPE(f,a) \
-               ((ARR_FLAGS(a)&= ~ARR_OBJ_MASK), (ARR_FLAGS(a)|=((f<<2)&ARR_OBJ_MASK)))
-
-/*
+ * Access macros for array header fields.
+ *
  * ARR_DIMS returns a pointer to an array of array dimensions (number of
  * elements along the various array axes).
  *
@@ -85,39 +59,27 @@ typedef struct
  *
  * Unlike C, the default lower bound is 1.
  */
+#define ARR_SIZE(a)                            (((ArrayType *) (a))->size)
+#define ARR_NDIM(a)                            (((ArrayType *) (a))->ndim)
+
 #define ARR_DIMS(a) \
-               ((int *) (((char *) a) + sizeof(ArrayType)))
+               ((int *) (((char *) (a)) + sizeof(ArrayType)))
 #define ARR_LBOUND(a) \
-               ((int *) (((char *) a) + sizeof(ArrayType) + \
-                                 (sizeof(int) * (((ArrayType *) a)->ndim))))
-
-/*
- * Returns a pointer to the actual array data.
- */
-#define ARR_DATA_PTR(a) \
-               (((char *) a) + \
-                MAXALIGN(sizeof(ArrayType) + 2 * (sizeof(int) * (a)->ndim)))
+               ((int *) (((char *) (a)) + sizeof(ArrayType) + \
+                                 (sizeof(int) * ARR_NDIM(a))))
 
 /*
  * The total array header size for an array of dimension n (in bytes).
  */
 #define ARR_OVERHEAD(n) \
-               (MAXALIGN(sizeof(ArrayType) + 2 * (n) * sizeof(int)))
+               (MAXALIGN(sizeof(ArrayType) + 2 * sizeof(int) * (n)))
 
-/*------------------------------------------------------------------------
- * Miscellaneous helper definitions and routines for arrayfuncs.c
- *------------------------------------------------------------------------
+/*
+ * Returns a pointer to the actual array data.
  */
+#define ARR_DATA_PTR(a) \
+               (((char *) (a)) + ARR_OVERHEAD(ARR_NDIM(a)))
 
-#define RETURN_NULL(type)  do { *isNull = true; return (type) 0; } while (0)
-
-#define NAME_LEN       30
-
-typedef struct
-{
-       char            lo_name[NAME_LEN];
-       int                     C[MAXDIM];
-} CHUNK_INFO;
 
 /*
  * prototypes for functions defined in arrayfuncs.c
@@ -134,13 +96,16 @@ extern ArrayType *array_set(ArrayType *array, int nSubscripts, int *indx,
                                                        Datum dataValue,
                                                        bool elmbyval, int elmlen,
                                                        int arraylen, bool *isNull);
-extern ArrayType *array_clip(ArrayType *array, int nSubscripts,
-                                                        int *upperIndx, int *lowerIndx,
-                                                        bool elmbyval, int elmlen, bool *isNull);
-extern ArrayType *array_assgn(ArrayType *array, int nSubscripts,
-                                                         int *upperIndx, int *lowerIndx,
-                                                         ArrayType *newArr,
-                                                         bool elmbyval, int elmlen, bool *isNull);
+extern ArrayType *array_get_slice(ArrayType *array, int nSubscripts,
+                                                                 int *upperIndx, int *lowerIndx,
+                                                                 bool elmbyval, int elmlen,
+                                                                 int arraylen, bool *isNull);
+extern ArrayType *array_set_slice(ArrayType *array, int nSubscripts,
+                                                                 int *upperIndx, int *lowerIndx,
+                                                                 ArrayType *srcArray,
+                                                                 bool elmbyval, int elmlen,
+                                                                 int arraylen, bool *isNull);
+
 extern Datum array_map(FunctionCallInfo fcinfo, Oid inpType, Oid retType);
 
 extern ArrayType *construct_array(Datum *elems, int nelems,
@@ -149,35 +114,18 @@ extern void deconstruct_array(ArrayType *array,
                                                          bool elmbyval, int elmlen, char elmalign,
                                                          Datum **elemsp, int *nelemsp);
 
-extern int _LOtransfer(char **destfd, int size, int nitems, char **srcfd,
-                       int isSrcLO, int isDestLO);
-extern char *_array_newLO(int *fd, int flag);
-
 
 /*
  * prototypes for functions defined in arrayutils.c
- * [these names seem to be too generic. Add prefix for arrays? -- AY]
  */
 
-extern int     GetOffset(int n, int *dim, int *lb, int *indx);
-extern int     getNitems(int n, int *a);
-extern int     compute_size(int *st, int *endp, int n, int base);
-extern void mda_get_offset_values(int n, int *dist, int *PC, int *span);
+extern int     ArrayGetOffset(int n, int *dim, int *lb, int *indx);
+extern int     ArrayGetOffset0(int n, int *tup, int *scale);
+extern int     ArrayGetNItems(int n, int *a);
 extern void mda_get_range(int n, int *span, int *st, int *endp);
-extern void mda_get_prod(int n, int *range, int *P);
-extern int     tuple2linear(int n, int *tup, int *scale);
-extern void array2chunk_coord(int n, int *C, int *a_coord, int *c_coord);
-extern int     next_tuple(int n, int *curr, int *span);
-
-/*
- * prototypes for functions defined in chunk.c
- */
-extern char *_ChunkArray(int fd, FILE *afd, int ndim, int *dim, int baseSize,
-                       int *nbytes, char *chunkfile);
-extern int _ReadChunkArray(int *st, int *endp, int bsize, int fp,
-                        char *destfp, ArrayType *array, int isDestLO, bool *isNull);
-extern struct varlena *_ReadChunkArray1El(int *st, int bsize, int fp,
-                                  ArrayType *array, bool *isNull);
+extern void mda_get_prod(int n, int *range, int *prod);
+extern void mda_get_offset_values(int n, int *dist, int *prod, int *span);
+extern int     mda_next_tuple(int n, int *curr, int *span);
 
 
 #endif  /* ARRAY_H */