LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 481 537 89.6 %
Date: 2023-05-30 18:12:27 Functions: 16 17 94.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * copyfrom.c
       4             :  *      COPY <table> FROM file/program/client
       5             :  *
       6             :  * This file contains routines needed to efficiently load tuples into a
       7             :  * table.  That includes looking up the correct partition, firing triggers,
       8             :  * calling the table AM function to insert the data, and updating indexes.
       9             :  * Reading data from the input file or client and parsing it into Datums
      10             :  * is handled in copyfromparse.c.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/commands/copyfrom.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : #include "postgres.h"
      22             : 
      23             : #include <ctype.h>
      24             : #include <unistd.h>
      25             : #include <sys/stat.h>
      26             : 
      27             : #include "access/heapam.h"
      28             : #include "access/htup_details.h"
      29             : #include "access/tableam.h"
      30             : #include "access/xact.h"
      31             : #include "access/xlog.h"
      32             : #include "catalog/namespace.h"
      33             : #include "commands/copy.h"
      34             : #include "commands/copyfrom_internal.h"
      35             : #include "commands/progress.h"
      36             : #include "commands/trigger.h"
      37             : #include "executor/execPartition.h"
      38             : #include "executor/executor.h"
      39             : #include "executor/nodeModifyTable.h"
      40             : #include "executor/tuptable.h"
      41             : #include "foreign/fdwapi.h"
      42             : #include "libpq/libpq.h"
      43             : #include "libpq/pqformat.h"
      44             : #include "miscadmin.h"
      45             : #include "optimizer/optimizer.h"
      46             : #include "pgstat.h"
      47             : #include "rewrite/rewriteHandler.h"
      48             : #include "storage/fd.h"
      49             : #include "tcop/tcopprot.h"
      50             : #include "utils/lsyscache.h"
      51             : #include "utils/memutils.h"
      52             : #include "utils/portal.h"
      53             : #include "utils/rel.h"
      54             : #include "utils/snapmgr.h"
      55             : 
      56             : /*
      57             :  * No more than this many tuples per CopyMultiInsertBuffer
      58             :  *
      59             :  * Caution: Don't make this too big, as we could end up with this many
      60             :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      61             :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      62             :  * memory requirements during copies into partitioned tables with a large
      63             :  * number of partitions.
      64             :  */
      65             : #define MAX_BUFFERED_TUPLES     1000
      66             : 
      67             : /*
      68             :  * Flush buffers if there are >= this many bytes, as counted by the input
      69             :  * size, of tuples stored.
      70             :  */
      71             : #define MAX_BUFFERED_BYTES      65535
      72             : 
      73             : /* Trim the list of buffers back down to this number after flushing */
      74             : #define MAX_PARTITION_BUFFERS   32
      75             : 
      76             : /* Stores multi-insert data related to a single relation in CopyFrom. */
      77             : typedef struct CopyMultiInsertBuffer
      78             : {
      79             :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      80             :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      81             :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      82             :                                  * table; NULL if foreign table */
      83             :     int         nused;          /* number of 'slots' containing tuples */
      84             :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      85             :                                                  * stream */
      86             : } CopyMultiInsertBuffer;
      87             : 
      88             : /*
      89             :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      90             :  * number of tuples which are stored in them.  This allows multiple buffers to
      91             :  * exist at once when COPYing into a partitioned table.
      92             :  */
      93             : typedef struct CopyMultiInsertInfo
      94             : {
      95             :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      96             :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      97             :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
      98             :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
      99             :     EState     *estate;         /* Executor state used for COPY */
     100             :     CommandId   mycid;          /* Command Id used for COPY */
     101             :     int         ti_options;     /* table insert options */
     102             : } CopyMultiInsertInfo;
     103             : 
     104             : 
     105             : /* non-export function prototypes */
     106             : static char *limit_printout_length(const char *str);
     107             : 
     108             : static void ClosePipeFromProgram(CopyFromState cstate);
     109             : 
     110             : /*
     111             :  * error context callback for COPY FROM
     112             :  *
     113             :  * The argument for the error context must be CopyFromState.
     114             :  */
     115             : void
     116         214 : CopyFromErrorCallback(void *arg)
     117             : {
     118         214 :     CopyFromState cstate = (CopyFromState) arg;
     119             : 
     120         214 :     if (cstate->relname_only)
     121             :     {
     122           2 :         errcontext("COPY %s",
     123             :                    cstate->cur_relname);
     124           2 :         return;
     125             :     }
     126         212 :     if (cstate->opts.binary)
     127             :     {
     128             :         /* can't usefully display the data */
     129           2 :         if (cstate->cur_attname)
     130           2 :             errcontext("COPY %s, line %llu, column %s",
     131             :                        cstate->cur_relname,
     132           2 :                        (unsigned long long) cstate->cur_lineno,
     133             :                        cstate->cur_attname);
     134             :         else
     135           0 :             errcontext("COPY %s, line %llu",
     136             :                        cstate->cur_relname,
     137           0 :                        (unsigned long long) cstate->cur_lineno);
     138             :     }
     139             :     else
     140             :     {
     141         210 :         if (cstate->cur_attname && cstate->cur_attval)
     142          20 :         {
     143             :             /* error is relevant to a particular column */
     144             :             char       *attval;
     145             : 
     146          20 :             attval = limit_printout_length(cstate->cur_attval);
     147          20 :             errcontext("COPY %s, line %llu, column %s: \"%s\"",
     148             :                        cstate->cur_relname,
     149          20 :                        (unsigned long long) cstate->cur_lineno,
     150             :                        cstate->cur_attname,
     151             :                        attval);
     152          20 :             pfree(attval);
     153             :         }
     154         190 :         else if (cstate->cur_attname)
     155             :         {
     156             :             /* error is relevant to a particular column, value is NULL */
     157           6 :             errcontext("COPY %s, line %llu, column %s: null input",
     158             :                        cstate->cur_relname,
     159           6 :                        (unsigned long long) cstate->cur_lineno,
     160             :                        cstate->cur_attname);
     161             :         }
     162             :         else
     163             :         {
     164             :             /*
     165             :              * Error is relevant to a particular line.
     166             :              *
     167             :              * If line_buf still contains the correct line, print it.
     168             :              */
     169         184 :             if (cstate->line_buf_valid)
     170             :             {
     171             :                 char       *lineval;
     172             : 
     173         172 :                 lineval = limit_printout_length(cstate->line_buf.data);
     174         172 :                 errcontext("COPY %s, line %llu: \"%s\"",
     175             :                            cstate->cur_relname,
     176         172 :                            (unsigned long long) cstate->cur_lineno, lineval);
     177         172 :                 pfree(lineval);
     178             :             }
     179             :             else
     180             :             {
     181          12 :                 errcontext("COPY %s, line %llu",
     182             :                            cstate->cur_relname,
     183          12 :                            (unsigned long long) cstate->cur_lineno);
     184             :             }
     185             :         }
     186             :     }
     187             : }
     188             : 
     189             : /*
     190             :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     191             :  *
     192             :  * Returns a pstrdup'd copy of the input.
     193             :  */
     194             : static char *
     195         192 : limit_printout_length(const char *str)
     196             : {
     197             : #define MAX_COPY_DATA_DISPLAY 100
     198             : 
     199         192 :     int         slen = strlen(str);
     200             :     int         len;
     201             :     char       *res;
     202             : 
     203             :     /* Fast path if definitely okay */
     204         192 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     205         192 :         return pstrdup(str);
     206             : 
     207             :     /* Apply encoding-dependent truncation */
     208           0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     209             : 
     210             :     /*
     211             :      * Truncate, and add "..." to show we truncated the input.
     212             :      */
     213           0 :     res = (char *) palloc(len + 4);
     214           0 :     memcpy(res, str, len);
     215           0 :     strcpy(res + len, "...");
     216             : 
     217           0 :     return res;
     218             : }
     219             : 
     220             : /*
     221             :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     222             :  * ResultRelInfo.
     223             :  */
     224             : static CopyMultiInsertBuffer *
     225        1818 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     226             : {
     227             :     CopyMultiInsertBuffer *buffer;
     228             : 
     229        1818 :     buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
     230        1818 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     231        1818 :     buffer->resultRelInfo = rri;
     232        1818 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     233        1818 :     buffer->nused = 0;
     234             : 
     235        1818 :     return buffer;
     236             : }
     237             : 
     238             : /*
     239             :  * Make a new buffer for this ResultRelInfo.
     240             :  */
     241             : static inline void
     242        1818 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     243             :                                ResultRelInfo *rri)
     244             : {
     245             :     CopyMultiInsertBuffer *buffer;
     246             : 
     247        1818 :     buffer = CopyMultiInsertBufferInit(rri);
     248             : 
     249             :     /* Setup back-link so we can easily find this buffer again */
     250        1818 :     rri->ri_CopyMultiInsertBuffer = buffer;
     251             :     /* Record that we're tracking this buffer */
     252        1818 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     253        1818 : }
     254             : 
     255             : /*
     256             :  * Initialize an already allocated CopyMultiInsertInfo.
     257             :  *
     258             :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     259             :  * for that table.
     260             :  */
     261             : static void
     262        1806 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     263             :                         CopyFromState cstate, EState *estate, CommandId mycid,
     264             :                         int ti_options)
     265             : {
     266        1806 :     miinfo->multiInsertBuffers = NIL;
     267        1806 :     miinfo->bufferedTuples = 0;
     268        1806 :     miinfo->bufferedBytes = 0;
     269        1806 :     miinfo->cstate = cstate;
     270        1806 :     miinfo->estate = estate;
     271        1806 :     miinfo->mycid = mycid;
     272        1806 :     miinfo->ti_options = ti_options;
     273             : 
     274             :     /*
     275             :      * Only setup the buffer when not dealing with a partitioned table.
     276             :      * Buffers for partitioned tables will just be setup when we need to send
     277             :      * tuples their way for the first time.
     278             :      */
     279        1806 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     280        1734 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     281        1806 : }
     282             : 
     283             : /*
     284             :  * Returns true if the buffers are full
     285             :  */
     286             : static inline bool
     287     1585512 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     288             : {
     289     1585512 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     290     1584518 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     291        1100 :         return true;
     292     1584412 :     return false;
     293             : }
     294             : 
     295             : /*
     296             :  * Returns true if we have no buffered tuples
     297             :  */
     298             : static inline bool
     299        1722 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     300             : {
     301        1722 :     return miinfo->bufferedTuples == 0;
     302             : }
     303             : 
     304             : /*
     305             :  * Write the tuples stored in 'buffer' out to the table.
     306             :  */
     307             : static inline void
     308        2796 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     309             :                            CopyMultiInsertBuffer *buffer,
     310             :                            int64 *processed)
     311             : {
     312        2796 :     CopyFromState cstate = miinfo->cstate;
     313        2796 :     EState     *estate = miinfo->estate;
     314        2796 :     int         nused = buffer->nused;
     315        2796 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     316        2796 :     TupleTableSlot **slots = buffer->slots;
     317             :     int         i;
     318             : 
     319        2796 :     if (resultRelInfo->ri_FdwRoutine)
     320             :     {
     321          14 :         int         batch_size = resultRelInfo->ri_BatchSize;
     322          14 :         int         sent = 0;
     323             : 
     324             :         Assert(buffer->bistate == NULL);
     325             : 
     326             :         /* Ensure that the FDW supports batching and it's enabled */
     327             :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     328             :         Assert(batch_size > 1);
     329             : 
     330             :         /*
     331             :          * We suppress error context information other than the relation name,
     332             :          * if one of the operations below fails.
     333             :          */
     334             :         Assert(!cstate->relname_only);
     335          14 :         cstate->relname_only = true;
     336             : 
     337          38 :         while (sent < nused)
     338             :         {
     339          26 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     340          26 :             int         inserted = size;
     341             :             TupleTableSlot **rslots;
     342             : 
     343             :             /* insert into foreign table: let the FDW do it */
     344             :             rslots =
     345          26 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     346             :                                                                      resultRelInfo,
     347          26 :                                                                      &slots[sent],
     348             :                                                                      NULL,
     349             :                                                                      &inserted);
     350             : 
     351          24 :             sent += size;
     352             : 
     353             :             /* No need to do anything if there are no inserted rows */
     354          24 :             if (inserted <= 0)
     355           4 :                 continue;
     356             : 
     357             :             /* Triggers on foreign tables should not have transition tables */
     358             :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     359             :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     360             : 
     361             :             /* Run AFTER ROW INSERT triggers */
     362          20 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     363           0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     364             :             {
     365           0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     366             : 
     367           0 :                 for (i = 0; i < inserted; i++)
     368             :                 {
     369           0 :                     TupleTableSlot *slot = rslots[i];
     370             : 
     371             :                     /*
     372             :                      * AFTER ROW Triggers might reference the tableoid column,
     373             :                      * so (re-)initialize tts_tableOid before evaluating them.
     374             :                      */
     375           0 :                     slot->tts_tableOid = relid;
     376             : 
     377           0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     378             :                                          slot, NIL,
     379             :                                          cstate->transition_capture);
     380             :                 }
     381             :             }
     382             : 
     383             :             /* Update the row counter and progress of the COPY command */
     384          20 :             *processed += inserted;
     385          20 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     386             :                                          *processed);
     387             :         }
     388             : 
     389          48 :         for (i = 0; i < nused; i++)
     390          36 :             ExecClearTuple(slots[i]);
     391             : 
     392             :         /* reset relname_only */
     393          12 :         cstate->relname_only = false;
     394             :     }
     395             :     else
     396             :     {
     397        2782 :         CommandId   mycid = miinfo->mycid;
     398        2782 :         int         ti_options = miinfo->ti_options;
     399        2782 :         bool        line_buf_valid = cstate->line_buf_valid;
     400        2782 :         uint64      save_cur_lineno = cstate->cur_lineno;
     401             :         MemoryContext oldcontext;
     402             : 
     403             :         Assert(buffer->bistate != NULL);
     404             : 
     405             :         /*
     406             :          * Print error context information correctly, if one of the operations
     407             :          * below fails.
     408             :          */
     409        2782 :         cstate->line_buf_valid = false;
     410             : 
     411             :         /*
     412             :          * table_multi_insert may leak memory, so switch to short-lived memory
     413             :          * context before calling it.
     414             :          */
     415        2782 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     416        2782 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     417             :                            slots,
     418             :                            nused,
     419             :                            mycid,
     420             :                            ti_options,
     421             :                            buffer->bistate);
     422        2782 :         MemoryContextSwitchTo(oldcontext);
     423             : 
     424     1588110 :         for (i = 0; i < nused; i++)
     425             :         {
     426             :             /*
     427             :              * If there are any indexes, update them for all the inserted
     428             :              * tuples, and run AFTER ROW INSERT triggers.
     429             :              */
     430     1585340 :             if (resultRelInfo->ri_NumIndices > 0)
     431             :             {
     432             :                 List       *recheckIndexes;
     433             : 
     434      202350 :                 cstate->cur_lineno = buffer->linenos[i];
     435             :                 recheckIndexes =
     436      202350 :                     ExecInsertIndexTuples(resultRelInfo,
     437             :                                           buffer->slots[i], estate, false,
     438             :                                           false, NULL, NIL, false);
     439      202338 :                 ExecARInsertTriggers(estate, resultRelInfo,
     440      202338 :                                      slots[i], recheckIndexes,
     441             :                                      cstate->transition_capture);
     442      202338 :                 list_free(recheckIndexes);
     443             :             }
     444             : 
     445             :             /*
     446             :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     447             :              * triggers anyway.
     448             :              */
     449     1382990 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     450          78 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     451          60 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     452             :             {
     453          36 :                 cstate->cur_lineno = buffer->linenos[i];
     454          36 :                 ExecARInsertTriggers(estate, resultRelInfo,
     455          36 :                                      slots[i], NIL,
     456             :                                      cstate->transition_capture);
     457             :             }
     458             : 
     459     1585328 :             ExecClearTuple(slots[i]);
     460             :         }
     461             : 
     462             :         /* Update the row counter and progress of the COPY command */
     463        2770 :         *processed += nused;
     464        2770 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     465             :                                      *processed);
     466             : 
     467             :         /* reset cur_lineno and line_buf_valid to what they were */
     468        2770 :         cstate->line_buf_valid = line_buf_valid;
     469        2770 :         cstate->cur_lineno = save_cur_lineno;
     470             :     }
     471             : 
     472             :     /* Mark that all slots are free */
     473        2782 :     buffer->nused = 0;
     474        2782 : }
     475             : 
     476             : /*
     477             :  * Drop used slots and free member for this buffer.
     478             :  *
     479             :  * The buffer must be flushed before cleanup.
     480             :  */
     481             : static inline void
     482        1692 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     483             :                              CopyMultiInsertBuffer *buffer)
     484             : {
     485        1692 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     486             :     int         i;
     487             : 
     488             :     /* Ensure buffer was flushed */
     489             :     Assert(buffer->nused == 0);
     490             : 
     491             :     /* Remove back-link to ourself */
     492        1692 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     493             : 
     494        1692 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     495             :     {
     496             :         Assert(buffer->bistate != NULL);
     497        1680 :         FreeBulkInsertState(buffer->bistate);
     498             :     }
     499             :     else
     500             :         Assert(buffer->bistate == NULL);
     501             : 
     502             :     /* Since we only create slots on demand, just drop the non-null ones. */
     503      626384 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     504      624692 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     505             : 
     506        1692 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     507        1680 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     508             :                                  miinfo->ti_options);
     509             : 
     510        1692 :     pfree(buffer);
     511        1692 : }
     512             : 
     513             : /*
     514             :  * Write out all stored tuples in all buffers out to the tables.
     515             :  *
     516             :  * Once flushed we also trim the tracked buffers list down to size by removing
     517             :  * the buffers created earliest first.
     518             :  *
     519             :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     520             :  * used.  When cleaning up old buffers we'll never remove the one for
     521             :  * 'curr_rri'.
     522             :  */
     523             : static inline void
     524        2570 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     525             :                          int64 *processed)
     526             : {
     527             :     ListCell   *lc;
     528             : 
     529        5352 :     foreach(lc, miinfo->multiInsertBuffers)
     530             :     {
     531        2796 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     532             : 
     533        2796 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     534             :     }
     535             : 
     536        2556 :     miinfo->bufferedTuples = 0;
     537        2556 :     miinfo->bufferedBytes = 0;
     538             : 
     539             :     /*
     540             :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     541             :      * remove buffers starting with the ones we created first.  It seems less
     542             :      * likely that these older ones will be needed than the ones that were
     543             :      * just created.
     544             :      */
     545        2556 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     546             :     {
     547             :         CopyMultiInsertBuffer *buffer;
     548             : 
     549           0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     550             : 
     551             :         /*
     552             :          * We never want to remove the buffer that's currently being used, so
     553             :          * if we happen to find that then move it to the end of the list.
     554             :          */
     555           0 :         if (buffer->resultRelInfo == curr_rri)
     556             :         {
     557           0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     558           0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     559           0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     560             :         }
     561             : 
     562           0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     563           0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     564             :     }
     565        2556 : }
     566             : 
     567             : /*
     568             :  * Cleanup allocated buffers and free memory
     569             :  */
     570             : static inline void
     571        1680 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     572             : {
     573             :     ListCell   *lc;
     574             : 
     575        3372 :     foreach(lc, miinfo->multiInsertBuffers)
     576        1692 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     577             : 
     578        1680 :     list_free(miinfo->multiInsertBuffers);
     579        1680 : }
     580             : 
     581             : /*
     582             :  * Get the next TupleTableSlot that the next tuple should be stored in.
     583             :  *
     584             :  * Callers must ensure that the buffer is not full.
     585             :  *
     586             :  * Note: 'miinfo' is unused but has been included for consistency with the
     587             :  * other functions in this area.
     588             :  */
     589             : static inline TupleTableSlot *
     590     1587252 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     591             :                                 ResultRelInfo *rri)
     592             : {
     593     1587252 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     594     1587252 :     int         nused = buffer->nused;
     595             : 
     596             :     Assert(buffer != NULL);
     597             :     Assert(nused < MAX_BUFFERED_TUPLES);
     598             : 
     599     1587252 :     if (buffer->slots[nused] == NULL)
     600      624964 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     601     1587252 :     return buffer->slots[nused];
     602             : }
     603             : 
     604             : /*
     605             :  * Record the previously reserved TupleTableSlot that was reserved by
     606             :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     607             :  */
     608             : static inline void
     609     1585512 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     610             :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     611             : {
     612     1585512 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     613             : 
     614             :     Assert(buffer != NULL);
     615             :     Assert(slot == buffer->slots[buffer->nused]);
     616             : 
     617             :     /* Store the line number so we can properly report any errors later */
     618     1585512 :     buffer->linenos[buffer->nused] = lineno;
     619             : 
     620             :     /* Record this slot as being used */
     621     1585512 :     buffer->nused++;
     622             : 
     623             :     /* Update how many tuples are stored and their size */
     624     1585512 :     miinfo->bufferedTuples++;
     625     1585512 :     miinfo->bufferedBytes += tuplen;
     626     1585512 : }
     627             : 
     628             : /*
     629             :  * Copy FROM file to relation.
     630             :  */
     631             : uint64
     632        1978 : CopyFrom(CopyFromState cstate)
     633             : {
     634             :     ResultRelInfo *resultRelInfo;
     635             :     ResultRelInfo *target_resultRelInfo;
     636        1978 :     ResultRelInfo *prevResultRelInfo = NULL;
     637        1978 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     638             :     ModifyTableState *mtstate;
     639             :     ExprContext *econtext;
     640        1978 :     TupleTableSlot *singleslot = NULL;
     641        1978 :     MemoryContext oldcontext = CurrentMemoryContext;
     642             : 
     643        1978 :     PartitionTupleRouting *proute = NULL;
     644             :     ErrorContextCallback errcallback;
     645        1978 :     CommandId   mycid = GetCurrentCommandId(true);
     646        1978 :     int         ti_options = 0; /* start with default options for insert */
     647        1978 :     BulkInsertState bistate = NULL;
     648             :     CopyInsertMethod insertMethod;
     649        1978 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     650        1978 :     int64       processed = 0;
     651        1978 :     int64       excluded = 0;
     652             :     bool        has_before_insert_row_trig;
     653             :     bool        has_instead_insert_row_trig;
     654        1978 :     bool        leafpart_use_multi_insert = false;
     655             : 
     656             :     Assert(cstate->rel);
     657             :     Assert(list_length(cstate->range_table) == 1);
     658             : 
     659             :     /*
     660             :      * The target must be a plain, foreign, or partitioned relation, or have
     661             :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     662             :      * allowed on views, so we only hint about them in the view case.)
     663             :      */
     664        1978 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     665         152 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     666         114 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     667          18 :         !(cstate->rel->trigdesc &&
     668          12 :           cstate->rel->trigdesc->trig_insert_instead_row))
     669             :     {
     670           6 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     671           6 :             ereport(ERROR,
     672             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     673             :                      errmsg("cannot copy to view \"%s\"",
     674             :                             RelationGetRelationName(cstate->rel)),
     675             :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     676           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     677           0 :             ereport(ERROR,
     678             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     679             :                      errmsg("cannot copy to materialized view \"%s\"",
     680             :                             RelationGetRelationName(cstate->rel))));
     681           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     682           0 :             ereport(ERROR,
     683             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     684             :                      errmsg("cannot copy to sequence \"%s\"",
     685             :                             RelationGetRelationName(cstate->rel))));
     686             :         else
     687           0 :             ereport(ERROR,
     688             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     689             :                      errmsg("cannot copy to non-table relation \"%s\"",
     690             :                             RelationGetRelationName(cstate->rel))));
     691             :     }
     692             : 
     693             :     /*
     694             :      * If the target file is new-in-transaction, we assume that checking FSM
     695             :      * for free space is a waste of time.  This could possibly be wrong, but
     696             :      * it's unlikely.
     697             :      */
     698        1972 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     699        1826 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     700        1814 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     701          80 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     702             : 
     703             :     /*
     704             :      * Optimize if new relation storage was created in this subxact or one of
     705             :      * its committed children and we won't see those rows later as part of an
     706             :      * earlier scan or command. The subxact test ensures that if this subxact
     707             :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     708             :      * that the stronger test of exactly which subtransaction created it is
     709             :      * crucial for correctness of this optimization. The test for an earlier
     710             :      * scan or command tolerates false negatives. FREEZE causes other sessions
     711             :      * to see rows they would not see under MVCC, and a false negative merely
     712             :      * spreads that anomaly to the current session.
     713             :      */
     714        1972 :     if (cstate->opts.freeze)
     715             :     {
     716             :         /*
     717             :          * We currently disallow COPY FREEZE on partitioned tables.  The
     718             :          * reason for this is that we've simply not yet opened the partitions
     719             :          * to determine if the optimization can be applied to them.  We could
     720             :          * go and open them all here, but doing so may be quite a costly
     721             :          * overhead for small copies.  In any case, we may just end up routing
     722             :          * tuples to a small number of partitions.  It seems better just to
     723             :          * raise an ERROR for partitioned tables.
     724             :          */
     725          58 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     726             :         {
     727           6 :             ereport(ERROR,
     728             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     729             :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     730             :         }
     731             : 
     732             :         /*
     733             :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     734             :          * Scan-bearing queries generally create at least two registrations,
     735             :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     736             :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     737             :          * still detect ongoing catalog scans, each of which separately
     738             :          * registers the snapshot it uses.
     739             :          */
     740          52 :         InvalidateCatalogSnapshot();
     741          52 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     742           0 :             ereport(ERROR,
     743             :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     744             :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     745             : 
     746         104 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     747          52 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     748          18 :             ereport(ERROR,
     749             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     750             :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     751             : 
     752          34 :         ti_options |= TABLE_INSERT_FROZEN;
     753             :     }
     754             : 
     755             :     /*
     756             :      * We need a ResultRelInfo so we can use the regular executor's
     757             :      * index-entry-making machinery.  (There used to be a huge amount of code
     758             :      * here that basically duplicated execUtils.c ...)
     759             :      */
     760        1948 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
     761        1948 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     762        1948 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     763             : 
     764             :     /* Verify the named relation is a valid target for INSERT */
     765        1948 :     CheckValidResultRel(resultRelInfo, CMD_INSERT);
     766             : 
     767        1946 :     ExecOpenIndices(resultRelInfo, false);
     768             : 
     769             :     /*
     770             :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     771             :      * foreign-table result relation(s).
     772             :      */
     773        1946 :     mtstate = makeNode(ModifyTableState);
     774        1946 :     mtstate->ps.plan = NULL;
     775        1946 :     mtstate->ps.state = estate;
     776        1946 :     mtstate->operation = CMD_INSERT;
     777        1946 :     mtstate->mt_nrels = 1;
     778        1946 :     mtstate->resultRelInfo = resultRelInfo;
     779        1946 :     mtstate->rootResultRelInfo = resultRelInfo;
     780             : 
     781        1946 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     782          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     783          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     784             :                                                          resultRelInfo);
     785             : 
     786             :     /*
     787             :      * Also, if the named relation is a foreign table, determine if the FDW
     788             :      * supports batch insert and determine the batch size (a FDW may support
     789             :      * batching, but it may be disabled for the server/table).
     790             :      *
     791             :      * If the FDW does not support batching, we set the batch size to 1.
     792             :      */
     793        1946 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     794          36 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     795          36 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     796          36 :         resultRelInfo->ri_BatchSize =
     797          36 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     798             :     else
     799        1910 :         resultRelInfo->ri_BatchSize = 1;
     800             : 
     801             :     Assert(resultRelInfo->ri_BatchSize >= 1);
     802             : 
     803             :     /* Prepare to catch AFTER triggers. */
     804        1946 :     AfterTriggerBeginQuery();
     805             : 
     806             :     /*
     807             :      * If there are any triggers with transition tables on the named relation,
     808             :      * we need to be prepared to capture transition tuples.
     809             :      *
     810             :      * Because partition tuple routing would like to know about whether
     811             :      * transition capture is active, we also set it in mtstate, which is
     812             :      * passed to ExecFindPartition() below.
     813             :      */
     814        1946 :     cstate->transition_capture = mtstate->mt_transition_capture =
     815        1946 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     816        1946 :                                    RelationGetRelid(cstate->rel),
     817             :                                    CMD_INSERT);
     818             : 
     819             :     /*
     820             :      * If the named relation is a partitioned table, initialize state for
     821             :      * CopyFrom tuple routing.
     822             :      */
     823        1946 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     824          90 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     825             : 
     826        1946 :     if (cstate->whereClause)
     827          18 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     828             :                                         &mtstate->ps);
     829             : 
     830             :     /*
     831             :      * It's generally more efficient to prepare a bunch of tuples for
     832             :      * insertion, and insert them in one
     833             :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     834             :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     835             :      * However, there are a number of reasons why we might not be able to do
     836             :      * this.  These are explained below.
     837             :      */
     838        1946 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     839         176 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     840          84 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     841             :     {
     842             :         /*
     843             :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
     844             :          * triggers on the table. Such triggers might query the table we're
     845             :          * inserting into and act differently if the tuples that have already
     846             :          * been processed and prepared for insertion are not there.
     847             :          */
     848         104 :         insertMethod = CIM_SINGLE;
     849             :     }
     850        1842 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
     851          28 :              resultRelInfo->ri_BatchSize == 1)
     852             :     {
     853             :         /*
     854             :          * Can't support multi-inserts to a foreign table if the FDW does not
     855             :          * support batching, or it's disabled for the server or foreign table.
     856             :          */
     857          18 :         insertMethod = CIM_SINGLE;
     858             :     }
     859        1824 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
     860          26 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
     861             :     {
     862             :         /*
     863             :          * For partitioned tables we can't support multi-inserts when there
     864             :          * are any statement level insert triggers. It might be possible to
     865             :          * allow partitioned tables with such triggers in the future, but for
     866             :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
     867             :          * statement level insert triggers are on the same relation.
     868             :          */
     869          18 :         insertMethod = CIM_SINGLE;
     870             :     }
     871        1806 :     else if (cstate->volatile_defexprs)
     872             :     {
     873             :         /*
     874             :          * Can't support multi-inserts if there are any volatile default
     875             :          * expressions in the table.  Similarly to the trigger case above,
     876             :          * such expressions may query the table we're inserting into.
     877             :          *
     878             :          * Note: It does not matter if any partitions have any volatile
     879             :          * default expressions as we use the defaults from the target of the
     880             :          * COPY command.
     881             :          */
     882           0 :         insertMethod = CIM_SINGLE;
     883             :     }
     884        1806 :     else if (contain_volatile_functions(cstate->whereClause))
     885             :     {
     886             :         /*
     887             :          * Can't support multi-inserts if there are any volatile function
     888             :          * expressions in WHERE clause.  Similarly to the trigger case above,
     889             :          * such expressions may query the table we're inserting into.
     890             :          */
     891           0 :         insertMethod = CIM_SINGLE;
     892             :     }
     893             :     else
     894             :     {
     895             :         /*
     896             :          * For partitioned tables, we may still be able to perform bulk
     897             :          * inserts.  However, the possibility of this depends on which types
     898             :          * of triggers exist on the partition.  We must disable bulk inserts
     899             :          * if the partition is a foreign table that can't use batching or it
     900             :          * has any before row insert or insert instead triggers (same as we
     901             :          * checked above for the parent table).  Since the partition's
     902             :          * resultRelInfos are initialized only when we actually need to insert
     903             :          * the first tuple into them, we must have the intermediate insert
     904             :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
     905             :          * determine if we can use bulk-inserts for the partition being
     906             :          * inserted into.
     907             :          */
     908        1806 :         if (proute)
     909          72 :             insertMethod = CIM_MULTI_CONDITIONAL;
     910             :         else
     911        1734 :             insertMethod = CIM_MULTI;
     912             : 
     913        1806 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
     914             :                                 estate, mycid, ti_options);
     915             :     }
     916             : 
     917             :     /*
     918             :      * If not using batch mode (which allocates slots as needed) set up a
     919             :      * tuple slot too. When inserting into a partitioned table, we also need
     920             :      * one, even if we might batch insert, to read the tuple in the root
     921             :      * partition's form.
     922             :      */
     923        1946 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
     924             :     {
     925         212 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
     926             :                                        &estate->es_tupleTable);
     927         212 :         bistate = GetBulkInsertState();
     928             :     }
     929             : 
     930        2122 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     931         176 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
     932             : 
     933        2122 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     934         176 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
     935             : 
     936             :     /*
     937             :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
     938             :      * should do this for COPY, since it's not really an "INSERT" statement as
     939             :      * such. However, executing these triggers maintains consistency with the
     940             :      * EACH ROW triggers that we already fire on COPY.
     941             :      */
     942        1946 :     ExecBSInsertTriggers(estate, resultRelInfo);
     943             : 
     944        1946 :     econtext = GetPerTupleExprContext(estate);
     945             : 
     946             :     /* Set up callback to identify error line number */
     947        1946 :     errcallback.callback = CopyFromErrorCallback;
     948        1946 :     errcallback.arg = (void *) cstate;
     949        1946 :     errcallback.previous = error_context_stack;
     950        1946 :     error_context_stack = &errcallback;
     951             : 
     952             :     for (;;)
     953     1585918 :     {
     954             :         TupleTableSlot *myslot;
     955             :         bool        skip_tuple;
     956             : 
     957     1587864 :         CHECK_FOR_INTERRUPTS();
     958             : 
     959             :         /*
     960             :          * Reset the per-tuple exprcontext. We do this after every tuple, to
     961             :          * clean-up after expression evaluations etc.
     962             :          */
     963     1587864 :         ResetPerTupleExprContext(estate);
     964             : 
     965             :         /* select slot to (initially) load row into */
     966     1587864 :         if (insertMethod == CIM_SINGLE || proute)
     967             :         {
     968      214860 :             myslot = singleslot;
     969      214860 :             Assert(myslot != NULL);
     970             :         }
     971             :         else
     972             :         {
     973             :             Assert(resultRelInfo == target_resultRelInfo);
     974             :             Assert(insertMethod == CIM_MULTI);
     975             : 
     976     1373004 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
     977             :                                                      resultRelInfo);
     978             :         }
     979             : 
     980             :         /*
     981             :          * Switch to per-tuple context before calling NextCopyFrom, which does
     982             :          * evaluate default expressions etc. and requires per-tuple context.
     983             :          */
     984     1587864 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     985             : 
     986     1587864 :         ExecClearTuple(myslot);
     987             : 
     988             :         /* Directly store the values/nulls array in the slot */
     989     1587864 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
     990        1808 :             break;
     991             : 
     992     1585952 :         ExecStoreVirtualTuple(myslot);
     993             : 
     994             :         /*
     995             :          * Constraints and where clause might reference the tableoid column,
     996             :          * so (re-)initialize tts_tableOid before evaluating them.
     997             :          */
     998     1585952 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
     999             : 
    1000             :         /* Triggers and stuff need to be invoked in query context. */
    1001     1585952 :         MemoryContextSwitchTo(oldcontext);
    1002             : 
    1003     1585952 :         if (cstate->whereClause)
    1004             :         {
    1005          66 :             econtext->ecxt_scantuple = myslot;
    1006             :             /* Skip items that don't match COPY's WHERE clause */
    1007          66 :             if (!ExecQual(cstate->qualexpr, econtext))
    1008             :             {
    1009             :                 /*
    1010             :                  * Report that this tuple was filtered out by the WHERE
    1011             :                  * clause.
    1012             :                  */
    1013          36 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1014             :                                              ++excluded);
    1015          36 :                 continue;
    1016             :             }
    1017             :         }
    1018             : 
    1019             :         /* Determine the partition to insert the tuple into */
    1020     1585916 :         if (proute)
    1021             :         {
    1022             :             TupleConversionMap *map;
    1023             : 
    1024             :             /*
    1025             :              * Attempt to find a partition suitable for this tuple.
    1026             :              * ExecFindPartition() will raise an error if none can be found or
    1027             :              * if the found partition is not suitable for INSERTs.
    1028             :              */
    1029      214386 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1030             :                                               proute, myslot, estate);
    1031             : 
    1032      214384 :             if (prevResultRelInfo != resultRelInfo)
    1033             :             {
    1034             :                 /* Determine which triggers exist on this partition */
    1035      101564 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1036          54 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1037             : 
    1038      101564 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1039          54 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1040             : 
    1041             :                 /*
    1042             :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1043             :                  * OF triggers, or if the partition is a foreign table that
    1044             :                  * can't use batching.
    1045             :                  */
    1046      202966 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1047      101456 :                     !has_before_insert_row_trig &&
    1048      304398 :                     !has_instead_insert_row_trig &&
    1049      101432 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1050          12 :                      resultRelInfo->ri_BatchSize > 1);
    1051             : 
    1052             :                 /* Set the multi-insert buffer to use for this partition. */
    1053      101510 :                 if (leafpart_use_multi_insert)
    1054             :                 {
    1055      101428 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1056          84 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1057             :                                                        resultRelInfo);
    1058             :                 }
    1059          82 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1060          28 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1061             :                 {
    1062             :                     /*
    1063             :                      * Flush pending inserts if this partition can't use
    1064             :                      * batching, so rows are visible to triggers etc.
    1065             :                      */
    1066           0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1067             :                                              resultRelInfo,
    1068             :                                              &processed);
    1069             :                 }
    1070             : 
    1071      101510 :                 if (bistate != NULL)
    1072      101510 :                     ReleaseBulkInsertStatePin(bistate);
    1073      101510 :                 prevResultRelInfo = resultRelInfo;
    1074             :             }
    1075             : 
    1076             :             /*
    1077             :              * If we're capturing transition tuples, we might need to convert
    1078             :              * from the partition rowtype to root rowtype. But if there are no
    1079             :              * BEFORE triggers on the partition that could change the tuple,
    1080             :              * we can just remember the original unconverted tuple to avoid a
    1081             :              * needless round trip conversion.
    1082             :              */
    1083      214384 :             if (cstate->transition_capture != NULL)
    1084          54 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1085          54 :                     !has_before_insert_row_trig ? myslot : NULL;
    1086             : 
    1087             :             /*
    1088             :              * We might need to convert from the root rowtype to the partition
    1089             :              * rowtype.
    1090             :              */
    1091      214384 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1092      214384 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1093             :             {
    1094             :                 /* non batch insert */
    1095         136 :                 if (map != NULL)
    1096             :                 {
    1097             :                     TupleTableSlot *new_slot;
    1098             : 
    1099         110 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1100         110 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1101             :                 }
    1102             :             }
    1103             :             else
    1104             :             {
    1105             :                 /*
    1106             :                  * Prepare to queue up tuple for later batch insert into
    1107             :                  * current partition.
    1108             :                  */
    1109             :                 TupleTableSlot *batchslot;
    1110             : 
    1111             :                 /* no other path available for partitioned table */
    1112             :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1113             : 
    1114      214248 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1115             :                                                             resultRelInfo);
    1116             : 
    1117      214248 :                 if (map != NULL)
    1118       12200 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1119             :                                                    batchslot);
    1120             :                 else
    1121             :                 {
    1122             :                     /*
    1123             :                      * This looks more expensive than it is (Believe me, I
    1124             :                      * optimized it away. Twice.). The input is in virtual
    1125             :                      * form, and we'll materialize the slot below - for most
    1126             :                      * slot types the copy performs the work materialization
    1127             :                      * would later require anyway.
    1128             :                      */
    1129      202048 :                     ExecCopySlot(batchslot, myslot);
    1130      202048 :                     myslot = batchslot;
    1131             :                 }
    1132             :             }
    1133             : 
    1134             :             /* ensure that triggers etc see the right relation  */
    1135      214384 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1136             :         }
    1137             : 
    1138     1585914 :         skip_tuple = false;
    1139             : 
    1140             :         /* BEFORE ROW INSERT Triggers */
    1141     1585914 :         if (has_before_insert_row_trig)
    1142             :         {
    1143         274 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1144          16 :                 skip_tuple = true;  /* "do nothing" */
    1145             :         }
    1146             : 
    1147     1585914 :         if (!skip_tuple)
    1148             :         {
    1149             :             /*
    1150             :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1151             :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1152             :              * table or foreign table.
    1153             :              */
    1154     1585898 :             if (has_instead_insert_row_trig)
    1155             :             {
    1156          12 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1157             :             }
    1158             :             else
    1159             :             {
    1160             :                 /* Compute stored generated columns */
    1161     1585886 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1162      401796 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1163          36 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1164             :                                                CMD_INSERT);
    1165             : 
    1166             :                 /*
    1167             :                  * If the target is a plain table, check the constraints of
    1168             :                  * the tuple.
    1169             :                  */
    1170     1585886 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1171     1585798 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1172      401760 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1173             : 
    1174             :                 /*
    1175             :                  * Also check the tuple against the partition constraint, if
    1176             :                  * there is one; except that if we got here via tuple-routing,
    1177             :                  * we don't need to if there's no BR trigger defined on the
    1178             :                  * partition.
    1179             :                  */
    1180     1585856 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1181      214372 :                     (proute == NULL || has_before_insert_row_trig))
    1182        2308 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1183             : 
    1184             :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1185     1585856 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1186             :                 {
    1187             :                     /*
    1188             :                      * The slot previously might point into the per-tuple
    1189             :                      * context. For batching it needs to be longer lived.
    1190             :                      */
    1191     1585512 :                     ExecMaterializeSlot(myslot);
    1192             : 
    1193             :                     /* Add this tuple to the tuple buffer */
    1194     1585512 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1195             :                                              resultRelInfo, myslot,
    1196             :                                              cstate->line_buf.len,
    1197             :                                              cstate->cur_lineno);
    1198             : 
    1199             :                     /*
    1200             :                      * If enough inserts have queued up, then flush all
    1201             :                      * buffers out to their tables.
    1202             :                      */
    1203     1585512 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1204        1100 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1205             :                                                  resultRelInfo,
    1206             :                                                  &processed);
    1207             : 
    1208             :                     /*
    1209             :                      * We delay updating the row counter and progress of the
    1210             :                      * COPY command until after writing the tuples stored in
    1211             :                      * the buffer out to the table, as in single insert mode.
    1212             :                      * See CopyMultiInsertBufferFlush().
    1213             :                      */
    1214     1585512 :                     continue;   /* next tuple please */
    1215             :                 }
    1216             :                 else
    1217             :                 {
    1218         344 :                     List       *recheckIndexes = NIL;
    1219             : 
    1220             :                     /* OK, store the tuple */
    1221         344 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1222             :                     {
    1223          50 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1224             :                                                                                  resultRelInfo,
    1225             :                                                                                  myslot,
    1226             :                                                                                  NULL);
    1227             : 
    1228          48 :                         if (myslot == NULL) /* "do nothing" */
    1229           4 :                             continue;   /* next tuple please */
    1230             : 
    1231             :                         /*
    1232             :                          * AFTER ROW Triggers might reference the tableoid
    1233             :                          * column, so (re-)initialize tts_tableOid before
    1234             :                          * evaluating them.
    1235             :                          */
    1236          44 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1237             :                     }
    1238             :                     else
    1239             :                     {
    1240             :                         /* OK, store the tuple and create index entries for it */
    1241         294 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1242             :                                            myslot, mycid, ti_options, bistate);
    1243             : 
    1244         294 :                         if (resultRelInfo->ri_NumIndices > 0)
    1245           0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1246             :                                                                    myslot,
    1247             :                                                                    estate,
    1248             :                                                                    false,
    1249             :                                                                    false,
    1250             :                                                                    NULL,
    1251             :                                                                    NIL,
    1252             :                                                                    false);
    1253             :                     }
    1254             : 
    1255             :                     /* AFTER ROW INSERT Triggers */
    1256         338 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1257             :                                          recheckIndexes, cstate->transition_capture);
    1258             : 
    1259         338 :                     list_free(recheckIndexes);
    1260             :                 }
    1261             :             }
    1262             : 
    1263             :             /*
    1264             :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1265             :              * or FDW; this is the same definition used by nodeModifyTable.c
    1266             :              * for counting tuples inserted by an INSERT command.  Update
    1267             :              * progress of the COPY command as well.
    1268             :              */
    1269         350 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1270             :                                          ++processed);
    1271             :         }
    1272             :     }
    1273             : 
    1274             :     /* Flush any remaining buffered tuples */
    1275        1808 :     if (insertMethod != CIM_SINGLE)
    1276             :     {
    1277        1694 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1278        1470 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1279             :     }
    1280             : 
    1281             :     /* Done, clean up */
    1282        1794 :     error_context_stack = errcallback.previous;
    1283             : 
    1284        1794 :     if (bistate != NULL)
    1285         184 :         FreeBulkInsertState(bistate);
    1286             : 
    1287        1794 :     MemoryContextSwitchTo(oldcontext);
    1288             : 
    1289             :     /* Execute AFTER STATEMENT insertion triggers */
    1290        1794 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1291             : 
    1292             :     /* Handle queued AFTER triggers */
    1293        1794 :     AfterTriggerEndQuery(estate);
    1294             : 
    1295        1794 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1296             : 
    1297             :     /* Allow the FDW to shut down */
    1298        1794 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1299          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1300          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1301             :                                                               target_resultRelInfo);
    1302             : 
    1303             :     /* Tear down the multi-insert buffer data */
    1304        1794 :     if (insertMethod != CIM_SINGLE)
    1305        1680 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1306             : 
    1307             :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1308        1794 :     if (proute)
    1309          88 :         ExecCleanupTupleRouting(mtstate, proute);
    1310             : 
    1311             :     /* Close the result relations, including any trigger target relations */
    1312        1794 :     ExecCloseResultRelations(estate);
    1313        1794 :     ExecCloseRangeTableRelations(estate);
    1314             : 
    1315        1794 :     FreeExecutorState(estate);
    1316             : 
    1317        1794 :     return processed;
    1318             : }
    1319             : 
    1320             : /*
    1321             :  * Setup to read tuples from a file for COPY FROM.
    1322             :  *
    1323             :  * 'rel': Used as a template for the tuples
    1324             :  * 'whereClause': WHERE clause from the COPY FROM command
    1325             :  * 'filename': Name of server-local file to read, NULL for STDIN
    1326             :  * 'is_program': true if 'filename' is program to execute
    1327             :  * 'data_source_cb': callback that provides the input data
    1328             :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1329             :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1330             :  *
    1331             :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1332             :  */
    1333             : CopyFromState
    1334        2174 : BeginCopyFrom(ParseState *pstate,
    1335             :               Relation rel,
    1336             :               Node *whereClause,
    1337             :               const char *filename,
    1338             :               bool is_program,
    1339             :               copy_data_source_cb data_source_cb,
    1340             :               List *attnamelist,
    1341             :               List *options)
    1342             : {
    1343             :     CopyFromState cstate;
    1344        2174 :     bool        pipe = (filename == NULL);
    1345             :     TupleDesc   tupDesc;
    1346             :     AttrNumber  num_phys_attrs,
    1347             :                 num_defaults;
    1348             :     FmgrInfo   *in_functions;
    1349             :     Oid        *typioparams;
    1350             :     Oid         in_func_oid;
    1351             :     int        *defmap;
    1352             :     ExprState **defexprs;
    1353             :     MemoryContext oldcontext;
    1354             :     bool        volatile_defexprs;
    1355        2174 :     const int   progress_cols[] = {
    1356             :         PROGRESS_COPY_COMMAND,
    1357             :         PROGRESS_COPY_TYPE,
    1358             :         PROGRESS_COPY_BYTES_TOTAL
    1359             :     };
    1360        2174 :     int64       progress_vals[] = {
    1361             :         PROGRESS_COPY_COMMAND_FROM,
    1362             :         0,
    1363             :         0
    1364             :     };
    1365             : 
    1366             :     /* Allocate workspace and zero all fields */
    1367        2174 :     cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
    1368             : 
    1369             :     /*
    1370             :      * We allocate everything used by a cstate in a new memory context. This
    1371             :      * avoids memory leaks during repeated use of COPY in a query.
    1372             :      */
    1373        2174 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1374             :                                                 "COPY",
    1375             :                                                 ALLOCSET_DEFAULT_SIZES);
    1376             : 
    1377        2174 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1378             : 
    1379             :     /* Extract options from the statement node tree */
    1380        2174 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1381             : 
    1382             :     /* Process the target relation */
    1383        2052 :     cstate->rel = rel;
    1384             : 
    1385        2052 :     tupDesc = RelationGetDescr(cstate->rel);
    1386             : 
    1387             :     /* process common options or initialization */
    1388             : 
    1389             :     /* Generate or convert list of attributes to process */
    1390        2052 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1391             : 
    1392        2052 :     num_phys_attrs = tupDesc->natts;
    1393             : 
    1394             :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1395        2052 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1396        2052 :     if (cstate->opts.force_notnull)
    1397             :     {
    1398             :         List       *attnums;
    1399             :         ListCell   *cur;
    1400             : 
    1401          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1402             : 
    1403          52 :         foreach(cur, attnums)
    1404             :         {
    1405          32 :             int         attnum = lfirst_int(cur);
    1406          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1407             : 
    1408          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1409           6 :                 ereport(ERROR,
    1410             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1411             :                          errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
    1412             :                                 NameStr(attr->attname))));
    1413          26 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1414             :         }
    1415             :     }
    1416             : 
    1417             :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1418        2046 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1419        2046 :     if (cstate->opts.force_null)
    1420             :     {
    1421             :         List       *attnums;
    1422             :         ListCell   *cur;
    1423             : 
    1424          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1425             : 
    1426          52 :         foreach(cur, attnums)
    1427             :         {
    1428          32 :             int         attnum = lfirst_int(cur);
    1429          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1430             : 
    1431          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1432           6 :                 ereport(ERROR,
    1433             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1434             :                          errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
    1435             :                                 NameStr(attr->attname))));
    1436          26 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1437             :         }
    1438             :     }
    1439             : 
    1440             :     /* Convert convert_selectively name list to per-column flags */
    1441        2040 :     if (cstate->opts.convert_selectively)
    1442             :     {
    1443             :         List       *attnums;
    1444             :         ListCell   *cur;
    1445             : 
    1446           4 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1447             : 
    1448           4 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1449             : 
    1450           8 :         foreach(cur, attnums)
    1451             :         {
    1452           4 :             int         attnum = lfirst_int(cur);
    1453           4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1454             : 
    1455           4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1456           0 :                 ereport(ERROR,
    1457             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1458             :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1459             :                                          NameStr(attr->attname))));
    1460           4 :             cstate->convert_select_flags[attnum - 1] = true;
    1461             :         }
    1462             :     }
    1463             : 
    1464             :     /* Use client encoding when ENCODING option is not specified. */
    1465        2040 :     if (cstate->opts.file_encoding < 0)
    1466        2034 :         cstate->file_encoding = pg_get_client_encoding();
    1467             :     else
    1468           6 :         cstate->file_encoding = cstate->opts.file_encoding;
    1469             : 
    1470             :     /*
    1471             :      * Look up encoding conversion function.
    1472             :      */
    1473        2040 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1474           6 :         cstate->file_encoding == PG_SQL_ASCII ||
    1475           0 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1476             :     {
    1477        2040 :         cstate->need_transcoding = false;
    1478             :     }
    1479             :     else
    1480             :     {
    1481           0 :         cstate->need_transcoding = true;
    1482           0 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1483             :                                                             GetDatabaseEncoding());
    1484             :     }
    1485             : 
    1486        2040 :     cstate->copy_src = COPY_FILE;    /* default */
    1487             : 
    1488        2040 :     cstate->whereClause = whereClause;
    1489             : 
    1490             :     /* Initialize state variables */
    1491        2040 :     cstate->eol_type = EOL_UNKNOWN;
    1492        2040 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1493        2040 :     cstate->cur_lineno = 0;
    1494        2040 :     cstate->cur_attname = NULL;
    1495        2040 :     cstate->cur_attval = NULL;
    1496        2040 :     cstate->relname_only = false;
    1497             : 
    1498             :     /*
    1499             :      * Allocate buffers for the input pipeline.
    1500             :      *
    1501             :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1502             :      * input_buf and line_buf only in text mode.
    1503             :      */
    1504        2040 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1505        2040 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1506        2040 :     cstate->raw_reached_eof = false;
    1507             : 
    1508        2040 :     if (!cstate->opts.binary)
    1509             :     {
    1510             :         /*
    1511             :          * If encoding conversion is needed, we need another buffer to hold
    1512             :          * the converted input data.  Otherwise, we can just point input_buf
    1513             :          * to the same buffer as raw_buf.
    1514             :          */
    1515        2024 :         if (cstate->need_transcoding)
    1516             :         {
    1517           0 :             cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
    1518           0 :             cstate->input_buf_index = cstate->input_buf_len = 0;
    1519             :         }
    1520             :         else
    1521        2024 :             cstate->input_buf = cstate->raw_buf;
    1522        2024 :         cstate->input_reached_eof = false;
    1523             : 
    1524        2024 :         initStringInfo(&cstate->line_buf);
    1525             :     }
    1526             : 
    1527        2040 :     initStringInfo(&cstate->attribute_buf);
    1528             : 
    1529             :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1530        2040 :     if (pstate)
    1531             :     {
    1532        1980 :         cstate->range_table = pstate->p_rtable;
    1533        1980 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1534             :     }
    1535             : 
    1536        2040 :     tupDesc = RelationGetDescr(cstate->rel);
    1537        2040 :     num_phys_attrs = tupDesc->natts;
    1538        2040 :     num_defaults = 0;
    1539        2040 :     volatile_defexprs = false;
    1540             : 
    1541             :     /*
    1542             :      * Pick up the required catalog information for each attribute in the
    1543             :      * relation, including the input function, the element type (to pass to
    1544             :      * the input function), and info about defaults and constraints. (Which
    1545             :      * input function we use depends on text/binary format choice.)
    1546             :      */
    1547        2040 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1548        2040 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1549        2040 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1550        2040 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1551             : 
    1552       10140 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1553             :     {
    1554        8102 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1555             : 
    1556             :         /* We don't need info for dropped attributes */
    1557        8102 :         if (att->attisdropped)
    1558         124 :             continue;
    1559             : 
    1560             :         /* Fetch the input function and typioparam info */
    1561        7978 :         if (cstate->opts.binary)
    1562          62 :             getTypeBinaryInputInfo(att->atttypid,
    1563          62 :                                    &in_func_oid, &typioparams[attnum - 1]);
    1564             :         else
    1565        7916 :             getTypeInputInfo(att->atttypid,
    1566        7916 :                              &in_func_oid, &typioparams[attnum - 1]);
    1567        7976 :         fmgr_info(in_func_oid, &in_functions[attnum - 1]);
    1568             : 
    1569             :         /* Get default info if available */
    1570        7976 :         defexprs[attnum - 1] = NULL;
    1571             : 
    1572        7976 :         if (!att->attgenerated)
    1573             :         {
    1574        7956 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1575             :                                                                 attnum);
    1576             : 
    1577        7956 :             if (defexpr != NULL)
    1578             :             {
    1579             :                 /* Run the expression through planner */
    1580         412 :                 defexpr = expression_planner(defexpr);
    1581             : 
    1582             :                 /* Initialize executable expression in copycontext */
    1583         412 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1584             : 
    1585             :                 /* if NOT copied from input */
    1586             :                 /* use default value if one exists */
    1587         412 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1588             :                 {
    1589         166 :                     defmap[num_defaults] = attnum - 1;
    1590         166 :                     num_defaults++;
    1591             :                 }
    1592             : 
    1593             :                 /*
    1594             :                  * If a default expression looks at the table being loaded,
    1595             :                  * then it could give the wrong answer when using
    1596             :                  * multi-insert. Since database access can be dynamic this is
    1597             :                  * hard to test for exactly, so we use the much wider test of
    1598             :                  * whether the default expression is volatile. We allow for
    1599             :                  * the special case of when the default expression is the
    1600             :                  * nextval() of a sequence which in this specific case is
    1601             :                  * known to be safe for use with the multi-insert
    1602             :                  * optimization. Hence we use this special case function
    1603             :                  * checker rather than the standard check for
    1604             :                  * contain_volatile_functions().
    1605             :                  */
    1606         412 :                 if (!volatile_defexprs)
    1607         412 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1608             :             }
    1609             :         }
    1610             :     }
    1611             : 
    1612             : 
    1613             :     /* initialize progress */
    1614        2038 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1615        2038 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1616        2038 :     cstate->bytes_processed = 0;
    1617             : 
    1618             :     /* We keep those variables in cstate. */
    1619        2038 :     cstate->in_functions = in_functions;
    1620        2038 :     cstate->typioparams = typioparams;
    1621        2038 :     cstate->defmap = defmap;
    1622        2038 :     cstate->defexprs = defexprs;
    1623        2038 :     cstate->volatile_defexprs = volatile_defexprs;
    1624        2038 :     cstate->num_defaults = num_defaults;
    1625        2038 :     cstate->is_program = is_program;
    1626             : 
    1627        2038 :     if (data_source_cb)
    1628             :     {
    1629         310 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1630         310 :         cstate->copy_src = COPY_CALLBACK;
    1631         310 :         cstate->data_source_cb = data_source_cb;
    1632             :     }
    1633        1728 :     else if (pipe)
    1634             :     {
    1635         820 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1636             :         Assert(!is_program);    /* the grammar does not allow this */
    1637         820 :         if (whereToSendOutput == DestRemote)
    1638         820 :             ReceiveCopyBegin(cstate);
    1639             :         else
    1640           0 :             cstate->copy_file = stdin;
    1641             :     }
    1642             :     else
    1643             :     {
    1644         908 :         cstate->filename = pstrdup(filename);
    1645             : 
    1646         908 :         if (cstate->is_program)
    1647             :         {
    1648           0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1649           0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1650           0 :             if (cstate->copy_file == NULL)
    1651           0 :                 ereport(ERROR,
    1652             :                         (errcode_for_file_access(),
    1653             :                          errmsg("could not execute command \"%s\": %m",
    1654             :                                 cstate->filename)));
    1655             :         }
    1656             :         else
    1657             :         {
    1658             :             struct stat st;
    1659             : 
    1660         908 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1661         908 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1662         908 :             if (cstate->copy_file == NULL)
    1663             :             {
    1664             :                 /* copy errno because ereport subfunctions might change it */
    1665           0 :                 int         save_errno = errno;
    1666             : 
    1667           0 :                 ereport(ERROR,
    1668             :                         (errcode_for_file_access(),
    1669             :                          errmsg("could not open file \"%s\" for reading: %m",
    1670             :                                 cstate->filename),
    1671             :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1672             :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1673             :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1674             :             }
    1675             : 
    1676         908 :             if (fstat(fileno(cstate->copy_file), &st))
    1677           0 :                 ereport(ERROR,
    1678             :                         (errcode_for_file_access(),
    1679             :                          errmsg("could not stat file \"%s\": %m",
    1680             :                                 cstate->filename)));
    1681             : 
    1682         908 :             if (S_ISDIR(st.st_mode))
    1683           0 :                 ereport(ERROR,
    1684             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1685             :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1686             : 
    1687         908 :             progress_vals[2] = st.st_size;
    1688             :         }
    1689             :     }
    1690             : 
    1691        2038 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1692             : 
    1693        2038 :     if (cstate->opts.binary)
    1694             :     {
    1695             :         /* Read and verify binary header */
    1696          14 :         ReceiveCopyBinaryHeader(cstate);
    1697             :     }
    1698             : 
    1699             :     /* create workspace for CopyReadAttributes results */
    1700        2038 :     if (!cstate->opts.binary)
    1701             :     {
    1702        2024 :         AttrNumber  attr_count = list_length(cstate->attnumlist);
    1703             : 
    1704        2024 :         cstate->max_fields = attr_count;
    1705        2024 :         cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
    1706             :     }
    1707             : 
    1708        2038 :     MemoryContextSwitchTo(oldcontext);
    1709             : 
    1710        2038 :     return cstate;
    1711             : }
    1712             : 
    1713             : /*
    1714             :  * Clean up storage and release resources for COPY FROM.
    1715             :  */
    1716             : void
    1717        1554 : EndCopyFrom(CopyFromState cstate)
    1718             : {
    1719             :     /* No COPY FROM related resources except memory. */
    1720        1554 :     if (cstate->is_program)
    1721             :     {
    1722           0 :         ClosePipeFromProgram(cstate);
    1723             :     }
    1724             :     else
    1725             :     {
    1726        1554 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1727           0 :             ereport(ERROR,
    1728             :                     (errcode_for_file_access(),
    1729             :                      errmsg("could not close file \"%s\": %m",
    1730             :                             cstate->filename)));
    1731             :     }
    1732             : 
    1733        1554 :     pgstat_progress_end_command();
    1734             : 
    1735        1554 :     MemoryContextDelete(cstate->copycontext);
    1736        1554 :     pfree(cstate);
    1737        1554 : }
    1738             : 
    1739             : /*
    1740             :  * Closes the pipe from an external program, checking the pclose() return code.
    1741             :  */
    1742             : static void
    1743           0 : ClosePipeFromProgram(CopyFromState cstate)
    1744             : {
    1745             :     int         pclose_rc;
    1746             : 
    1747             :     Assert(cstate->is_program);
    1748             : 
    1749           0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1750           0 :     if (pclose_rc == -1)
    1751           0 :         ereport(ERROR,
    1752             :                 (errcode_for_file_access(),
    1753             :                  errmsg("could not close pipe to external command: %m")));
    1754           0 :     else if (pclose_rc != 0)
    1755             :     {
    1756             :         /*
    1757             :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1758             :          * expectable for the called program to fail with SIGPIPE, and we
    1759             :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1760             :          * problem.
    1761             :          */
    1762           0 :         if (!cstate->raw_reached_eof &&
    1763           0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1764           0 :             return;
    1765             : 
    1766           0 :         ereport(ERROR,
    1767             :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1768             :                  errmsg("program \"%s\" failed",
    1769             :                         cstate->filename),
    1770             :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1771             :     }
    1772             : }

Generated by: LCOV version 1.14