LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 505 562 89.9 %
Date: 2024-04-18 22:13:33 Functions: 16 17 94.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * copyfrom.c
       4             :  *      COPY <table> FROM file/program/client
       5             :  *
       6             :  * This file contains routines needed to efficiently load tuples into a
       7             :  * table.  That includes looking up the correct partition, firing triggers,
       8             :  * calling the table AM function to insert the data, and updating indexes.
       9             :  * Reading data from the input file or client and parsing it into Datums
      10             :  * is handled in copyfromparse.c.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/commands/copyfrom.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : #include "postgres.h"
      22             : 
      23             : #include <ctype.h>
      24             : #include <unistd.h>
      25             : #include <sys/stat.h>
      26             : 
      27             : #include "access/heapam.h"
      28             : #include "access/tableam.h"
      29             : #include "access/xact.h"
      30             : #include "catalog/namespace.h"
      31             : #include "commands/copy.h"
      32             : #include "commands/copyfrom_internal.h"
      33             : #include "commands/progress.h"
      34             : #include "commands/trigger.h"
      35             : #include "executor/execPartition.h"
      36             : #include "executor/executor.h"
      37             : #include "executor/nodeModifyTable.h"
      38             : #include "executor/tuptable.h"
      39             : #include "foreign/fdwapi.h"
      40             : #include "mb/pg_wchar.h"
      41             : #include "miscadmin.h"
      42             : #include "nodes/miscnodes.h"
      43             : #include "optimizer/optimizer.h"
      44             : #include "pgstat.h"
      45             : #include "rewrite/rewriteHandler.h"
      46             : #include "storage/fd.h"
      47             : #include "tcop/tcopprot.h"
      48             : #include "utils/lsyscache.h"
      49             : #include "utils/memutils.h"
      50             : #include "utils/portal.h"
      51             : #include "utils/rel.h"
      52             : #include "utils/snapmgr.h"
      53             : 
      54             : /*
      55             :  * No more than this many tuples per CopyMultiInsertBuffer
      56             :  *
      57             :  * Caution: Don't make this too big, as we could end up with this many
      58             :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      59             :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      60             :  * memory requirements during copies into partitioned tables with a large
      61             :  * number of partitions.
      62             :  */
      63             : #define MAX_BUFFERED_TUPLES     1000
      64             : 
      65             : /*
      66             :  * Flush buffers if there are >= this many bytes, as counted by the input
      67             :  * size, of tuples stored.
      68             :  */
      69             : #define MAX_BUFFERED_BYTES      65535
      70             : 
      71             : /* Trim the list of buffers back down to this number after flushing */
      72             : #define MAX_PARTITION_BUFFERS   32
      73             : 
      74             : /* Stores multi-insert data related to a single relation in CopyFrom. */
      75             : typedef struct CopyMultiInsertBuffer
      76             : {
      77             :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      78             :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      79             :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      80             :                                  * table; NULL if foreign table */
      81             :     int         nused;          /* number of 'slots' containing tuples */
      82             :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      83             :                                                  * stream */
      84             : } CopyMultiInsertBuffer;
      85             : 
      86             : /*
      87             :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      88             :  * number of tuples which are stored in them.  This allows multiple buffers to
      89             :  * exist at once when COPYing into a partitioned table.
      90             :  */
      91             : typedef struct CopyMultiInsertInfo
      92             : {
      93             :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      94             :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      95             :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
      96             :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
      97             :     EState     *estate;         /* Executor state used for COPY */
      98             :     CommandId   mycid;          /* Command Id used for COPY */
      99             :     int         ti_options;     /* table insert options */
     100             : } CopyMultiInsertInfo;
     101             : 
     102             : 
     103             : /* non-export function prototypes */
     104             : static void ClosePipeFromProgram(CopyFromState cstate);
     105             : 
     106             : /*
     107             :  * error context callback for COPY FROM
     108             :  *
     109             :  * The argument for the error context must be CopyFromState.
     110             :  */
     111             : void
     112         280 : CopyFromErrorCallback(void *arg)
     113             : {
     114         280 :     CopyFromState cstate = (CopyFromState) arg;
     115             : 
     116         280 :     if (cstate->relname_only)
     117             :     {
     118          44 :         errcontext("COPY %s",
     119             :                    cstate->cur_relname);
     120          44 :         return;
     121             :     }
     122         236 :     if (cstate->opts.binary)
     123             :     {
     124             :         /* can't usefully display the data */
     125           2 :         if (cstate->cur_attname)
     126           2 :             errcontext("COPY %s, line %llu, column %s",
     127             :                        cstate->cur_relname,
     128           2 :                        (unsigned long long) cstate->cur_lineno,
     129             :                        cstate->cur_attname);
     130             :         else
     131           0 :             errcontext("COPY %s, line %llu",
     132             :                        cstate->cur_relname,
     133           0 :                        (unsigned long long) cstate->cur_lineno);
     134             :     }
     135             :     else
     136             :     {
     137         234 :         if (cstate->cur_attname && cstate->cur_attval)
     138          32 :         {
     139             :             /* error is relevant to a particular column */
     140             :             char       *attval;
     141             : 
     142          32 :             attval = CopyLimitPrintoutLength(cstate->cur_attval);
     143          32 :             errcontext("COPY %s, line %llu, column %s: \"%s\"",
     144             :                        cstate->cur_relname,
     145          32 :                        (unsigned long long) cstate->cur_lineno,
     146             :                        cstate->cur_attname,
     147             :                        attval);
     148          32 :             pfree(attval);
     149             :         }
     150         202 :         else if (cstate->cur_attname)
     151             :         {
     152             :             /* error is relevant to a particular column, value is NULL */
     153           6 :             errcontext("COPY %s, line %llu, column %s: null input",
     154             :                        cstate->cur_relname,
     155           6 :                        (unsigned long long) cstate->cur_lineno,
     156             :                        cstate->cur_attname);
     157             :         }
     158             :         else
     159             :         {
     160             :             /*
     161             :              * Error is relevant to a particular line.
     162             :              *
     163             :              * If line_buf still contains the correct line, print it.
     164             :              */
     165         196 :             if (cstate->line_buf_valid)
     166             :             {
     167             :                 char       *lineval;
     168             : 
     169         184 :                 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
     170         184 :                 errcontext("COPY %s, line %llu: \"%s\"",
     171             :                            cstate->cur_relname,
     172         184 :                            (unsigned long long) cstate->cur_lineno, lineval);
     173         184 :                 pfree(lineval);
     174             :             }
     175             :             else
     176             :             {
     177          12 :                 errcontext("COPY %s, line %llu",
     178             :                            cstate->cur_relname,
     179          12 :                            (unsigned long long) cstate->cur_lineno);
     180             :             }
     181             :         }
     182             :     }
     183             : }
     184             : 
     185             : /*
     186             :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     187             :  *
     188             :  * Returns a pstrdup'd copy of the input.
     189             :  */
     190             : char *
     191         252 : CopyLimitPrintoutLength(const char *str)
     192             : {
     193             : #define MAX_COPY_DATA_DISPLAY 100
     194             : 
     195         252 :     int         slen = strlen(str);
     196             :     int         len;
     197             :     char       *res;
     198             : 
     199             :     /* Fast path if definitely okay */
     200         252 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     201         252 :         return pstrdup(str);
     202             : 
     203             :     /* Apply encoding-dependent truncation */
     204           0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     205             : 
     206             :     /*
     207             :      * Truncate, and add "..." to show we truncated the input.
     208             :      */
     209           0 :     res = (char *) palloc(len + 4);
     210           0 :     memcpy(res, str, len);
     211           0 :     strcpy(res + len, "...");
     212             : 
     213           0 :     return res;
     214             : }
     215             : 
     216             : /*
     217             :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     218             :  * ResultRelInfo.
     219             :  */
     220             : static CopyMultiInsertBuffer *
     221        1386 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     222             : {
     223             :     CopyMultiInsertBuffer *buffer;
     224             : 
     225        1386 :     buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
     226        1386 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     227        1386 :     buffer->resultRelInfo = rri;
     228        1386 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     229        1386 :     buffer->nused = 0;
     230             : 
     231        1386 :     return buffer;
     232             : }
     233             : 
     234             : /*
     235             :  * Make a new buffer for this ResultRelInfo.
     236             :  */
     237             : static inline void
     238        1386 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     239             :                                ResultRelInfo *rri)
     240             : {
     241             :     CopyMultiInsertBuffer *buffer;
     242             : 
     243        1386 :     buffer = CopyMultiInsertBufferInit(rri);
     244             : 
     245             :     /* Setup back-link so we can easily find this buffer again */
     246        1386 :     rri->ri_CopyMultiInsertBuffer = buffer;
     247             :     /* Record that we're tracking this buffer */
     248        1386 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     249        1386 : }
     250             : 
     251             : /*
     252             :  * Initialize an already allocated CopyMultiInsertInfo.
     253             :  *
     254             :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     255             :  * for that table.
     256             :  */
     257             : static void
     258        1374 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     259             :                         CopyFromState cstate, EState *estate, CommandId mycid,
     260             :                         int ti_options)
     261             : {
     262        1374 :     miinfo->multiInsertBuffers = NIL;
     263        1374 :     miinfo->bufferedTuples = 0;
     264        1374 :     miinfo->bufferedBytes = 0;
     265        1374 :     miinfo->cstate = cstate;
     266        1374 :     miinfo->estate = estate;
     267        1374 :     miinfo->mycid = mycid;
     268        1374 :     miinfo->ti_options = ti_options;
     269             : 
     270             :     /*
     271             :      * Only setup the buffer when not dealing with a partitioned table.
     272             :      * Buffers for partitioned tables will just be setup when we need to send
     273             :      * tuples their way for the first time.
     274             :      */
     275        1374 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     276        1300 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     277        1374 : }
     278             : 
     279             : /*
     280             :  * Returns true if the buffers are full
     281             :  */
     282             : static inline bool
     283     1187944 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     284             : {
     285     1187944 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     286     1186950 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     287        1100 :         return true;
     288     1186844 :     return false;
     289             : }
     290             : 
     291             : /*
     292             :  * Returns true if we have no buffered tuples
     293             :  */
     294             : static inline bool
     295        1266 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     296             : {
     297        1266 :     return miinfo->bufferedTuples == 0;
     298             : }
     299             : 
     300             : /*
     301             :  * Write the tuples stored in 'buffer' out to the table.
     302             :  */
     303             : static inline void
     304        2334 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     305             :                            CopyMultiInsertBuffer *buffer,
     306             :                            int64 *processed)
     307             : {
     308        2334 :     CopyFromState cstate = miinfo->cstate;
     309        2334 :     EState     *estate = miinfo->estate;
     310        2334 :     int         nused = buffer->nused;
     311        2334 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     312        2334 :     TupleTableSlot **slots = buffer->slots;
     313             :     int         i;
     314             : 
     315        2334 :     if (resultRelInfo->ri_FdwRoutine)
     316             :     {
     317          14 :         int         batch_size = resultRelInfo->ri_BatchSize;
     318          14 :         int         sent = 0;
     319             : 
     320             :         Assert(buffer->bistate == NULL);
     321             : 
     322             :         /* Ensure that the FDW supports batching and it's enabled */
     323             :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     324             :         Assert(batch_size > 1);
     325             : 
     326             :         /*
     327             :          * We suppress error context information other than the relation name,
     328             :          * if one of the operations below fails.
     329             :          */
     330             :         Assert(!cstate->relname_only);
     331          14 :         cstate->relname_only = true;
     332             : 
     333          38 :         while (sent < nused)
     334             :         {
     335          26 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     336          26 :             int         inserted = size;
     337             :             TupleTableSlot **rslots;
     338             : 
     339             :             /* insert into foreign table: let the FDW do it */
     340             :             rslots =
     341          26 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     342             :                                                                      resultRelInfo,
     343          26 :                                                                      &slots[sent],
     344             :                                                                      NULL,
     345             :                                                                      &inserted);
     346             : 
     347          24 :             sent += size;
     348             : 
     349             :             /* No need to do anything if there are no inserted rows */
     350          24 :             if (inserted <= 0)
     351           4 :                 continue;
     352             : 
     353             :             /* Triggers on foreign tables should not have transition tables */
     354             :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     355             :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     356             : 
     357             :             /* Run AFTER ROW INSERT triggers */
     358          20 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     359           0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     360             :             {
     361           0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     362             : 
     363           0 :                 for (i = 0; i < inserted; i++)
     364             :                 {
     365           0 :                     TupleTableSlot *slot = rslots[i];
     366             : 
     367             :                     /*
     368             :                      * AFTER ROW Triggers might reference the tableoid column,
     369             :                      * so (re-)initialize tts_tableOid before evaluating them.
     370             :                      */
     371           0 :                     slot->tts_tableOid = relid;
     372             : 
     373           0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     374             :                                          slot, NIL,
     375             :                                          cstate->transition_capture);
     376             :                 }
     377             :             }
     378             : 
     379             :             /* Update the row counter and progress of the COPY command */
     380          20 :             *processed += inserted;
     381          20 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     382             :                                          *processed);
     383             :         }
     384             : 
     385          48 :         for (i = 0; i < nused; i++)
     386          36 :             ExecClearTuple(slots[i]);
     387             : 
     388             :         /* reset relname_only */
     389          12 :         cstate->relname_only = false;
     390             :     }
     391             :     else
     392             :     {
     393        2320 :         CommandId   mycid = miinfo->mycid;
     394        2320 :         int         ti_options = miinfo->ti_options;
     395        2320 :         bool        line_buf_valid = cstate->line_buf_valid;
     396        2320 :         uint64      save_cur_lineno = cstate->cur_lineno;
     397             :         MemoryContext oldcontext;
     398             : 
     399             :         Assert(buffer->bistate != NULL);
     400             : 
     401             :         /*
     402             :          * Print error context information correctly, if one of the operations
     403             :          * below fails.
     404             :          */
     405        2320 :         cstate->line_buf_valid = false;
     406             : 
     407             :         /*
     408             :          * table_multi_insert may leak memory, so switch to short-lived memory
     409             :          * context before calling it.
     410             :          */
     411        2320 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     412        2320 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     413             :                            slots,
     414             :                            nused,
     415             :                            mycid,
     416             :                            ti_options,
     417             :                            buffer->bistate);
     418        2320 :         MemoryContextSwitchTo(oldcontext);
     419             : 
     420     1190112 :         for (i = 0; i < nused; i++)
     421             :         {
     422             :             /*
     423             :              * If there are any indexes, update them for all the inserted
     424             :              * tuples, and run AFTER ROW INSERT triggers.
     425             :              */
     426     1187804 :             if (resultRelInfo->ri_NumIndices > 0)
     427             :             {
     428             :                 List       *recheckIndexes;
     429             : 
     430      201834 :                 cstate->cur_lineno = buffer->linenos[i];
     431             :                 recheckIndexes =
     432      201834 :                     ExecInsertIndexTuples(resultRelInfo,
     433             :                                           buffer->slots[i], estate, false,
     434             :                                           false, NULL, NIL, false);
     435      201822 :                 ExecARInsertTriggers(estate, resultRelInfo,
     436      201822 :                                      slots[i], recheckIndexes,
     437             :                                      cstate->transition_capture);
     438      201822 :                 list_free(recheckIndexes);
     439             :             }
     440             : 
     441             :             /*
     442             :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     443             :              * triggers anyway.
     444             :              */
     445      985970 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     446          78 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     447          60 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     448             :             {
     449          36 :                 cstate->cur_lineno = buffer->linenos[i];
     450          36 :                 ExecARInsertTriggers(estate, resultRelInfo,
     451          36 :                                      slots[i], NIL,
     452             :                                      cstate->transition_capture);
     453             :             }
     454             : 
     455     1187792 :             ExecClearTuple(slots[i]);
     456             :         }
     457             : 
     458             :         /* Update the row counter and progress of the COPY command */
     459        2308 :         *processed += nused;
     460        2308 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     461             :                                      *processed);
     462             : 
     463             :         /* reset cur_lineno and line_buf_valid to what they were */
     464        2308 :         cstate->line_buf_valid = line_buf_valid;
     465        2308 :         cstate->cur_lineno = save_cur_lineno;
     466             :     }
     467             : 
     468             :     /* Mark that all slots are free */
     469        2320 :     buffer->nused = 0;
     470        2320 : }
     471             : 
     472             : /*
     473             :  * Drop used slots and free member for this buffer.
     474             :  *
     475             :  * The buffer must be flushed before cleanup.
     476             :  */
     477             : static inline void
     478        1236 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     479             :                              CopyMultiInsertBuffer *buffer)
     480             : {
     481        1236 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     482             :     int         i;
     483             : 
     484             :     /* Ensure buffer was flushed */
     485             :     Assert(buffer->nused == 0);
     486             : 
     487             :     /* Remove back-link to ourself */
     488        1236 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     489             : 
     490        1236 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     491             :     {
     492             :         Assert(buffer->bistate != NULL);
     493        1224 :         FreeBulkInsertState(buffer->bistate);
     494             :     }
     495             :     else
     496             :         Assert(buffer->bistate == NULL);
     497             : 
     498             :     /* Since we only create slots on demand, just drop the non-null ones. */
     499      227932 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     500      226696 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     501             : 
     502        1236 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     503        1224 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     504             :                                  miinfo->ti_options);
     505             : 
     506        1236 :     pfree(buffer);
     507        1236 : }
     508             : 
     509             : /*
     510             :  * Write out all stored tuples in all buffers out to the tables.
     511             :  *
     512             :  * Once flushed we also trim the tracked buffers list down to size by removing
     513             :  * the buffers created earliest first.
     514             :  *
     515             :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     516             :  * used.  When cleaning up old buffers we'll never remove the one for
     517             :  * 'curr_rri'.
     518             :  */
     519             : static inline void
     520        2108 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     521             :                          int64 *processed)
     522             : {
     523             :     ListCell   *lc;
     524             : 
     525        4428 :     foreach(lc, miinfo->multiInsertBuffers)
     526             :     {
     527        2334 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     528             : 
     529        2334 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     530             :     }
     531             : 
     532        2094 :     miinfo->bufferedTuples = 0;
     533        2094 :     miinfo->bufferedBytes = 0;
     534             : 
     535             :     /*
     536             :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     537             :      * remove buffers starting with the ones we created first.  It seems less
     538             :      * likely that these older ones will be needed than the ones that were
     539             :      * just created.
     540             :      */
     541        2094 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     542             :     {
     543             :         CopyMultiInsertBuffer *buffer;
     544             : 
     545           0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     546             : 
     547             :         /*
     548             :          * We never want to remove the buffer that's currently being used, so
     549             :          * if we happen to find that then move it to the end of the list.
     550             :          */
     551           0 :         if (buffer->resultRelInfo == curr_rri)
     552             :         {
     553           0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     554           0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     555           0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     556             :         }
     557             : 
     558           0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     559           0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     560             :     }
     561        2094 : }
     562             : 
     563             : /*
     564             :  * Cleanup allocated buffers and free memory
     565             :  */
     566             : static inline void
     567        1224 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     568             : {
     569             :     ListCell   *lc;
     570             : 
     571        2460 :     foreach(lc, miinfo->multiInsertBuffers)
     572        1236 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     573             : 
     574        1224 :     list_free(miinfo->multiInsertBuffers);
     575        1224 : }
     576             : 
     577             : /*
     578             :  * Get the next TupleTableSlot that the next tuple should be stored in.
     579             :  *
     580             :  * Callers must ensure that the buffer is not full.
     581             :  *
     582             :  * Note: 'miinfo' is unused but has been included for consistency with the
     583             :  * other functions in this area.
     584             :  */
     585             : static inline TupleTableSlot *
     586     1189292 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     587             :                                 ResultRelInfo *rri)
     588             : {
     589     1189292 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     590     1189292 :     int         nused = buffer->nused;
     591             : 
     592             :     Assert(buffer != NULL);
     593             :     Assert(nused < MAX_BUFFERED_TUPLES);
     594             : 
     595     1189292 :     if (buffer->slots[nused] == NULL)
     596      226960 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     597     1189292 :     return buffer->slots[nused];
     598             : }
     599             : 
     600             : /*
     601             :  * Record the previously reserved TupleTableSlot that was reserved by
     602             :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     603             :  */
     604             : static inline void
     605     1187944 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     606             :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     607             : {
     608     1187944 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     609             : 
     610             :     Assert(buffer != NULL);
     611             :     Assert(slot == buffer->slots[buffer->nused]);
     612             : 
     613             :     /* Store the line number so we can properly report any errors later */
     614     1187944 :     buffer->linenos[buffer->nused] = lineno;
     615             : 
     616             :     /* Record this slot as being used */
     617     1187944 :     buffer->nused++;
     618             : 
     619             :     /* Update how many tuples are stored and their size */
     620     1187944 :     miinfo->bufferedTuples++;
     621     1187944 :     miinfo->bufferedBytes += tuplen;
     622     1187944 : }
     623             : 
     624             : /*
     625             :  * Copy FROM file to relation.
     626             :  */
     627             : uint64
     628        1552 : CopyFrom(CopyFromState cstate)
     629             : {
     630             :     ResultRelInfo *resultRelInfo;
     631             :     ResultRelInfo *target_resultRelInfo;
     632        1552 :     ResultRelInfo *prevResultRelInfo = NULL;
     633        1552 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     634             :     ModifyTableState *mtstate;
     635             :     ExprContext *econtext;
     636        1552 :     TupleTableSlot *singleslot = NULL;
     637        1552 :     MemoryContext oldcontext = CurrentMemoryContext;
     638             : 
     639        1552 :     PartitionTupleRouting *proute = NULL;
     640             :     ErrorContextCallback errcallback;
     641        1552 :     CommandId   mycid = GetCurrentCommandId(true);
     642        1552 :     int         ti_options = 0; /* start with default options for insert */
     643        1552 :     BulkInsertState bistate = NULL;
     644             :     CopyInsertMethod insertMethod;
     645        1552 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     646        1552 :     int64       processed = 0;
     647        1552 :     int64       excluded = 0;
     648        1552 :     int64       skipped = 0;
     649             :     bool        has_before_insert_row_trig;
     650             :     bool        has_instead_insert_row_trig;
     651        1552 :     bool        leafpart_use_multi_insert = false;
     652             : 
     653             :     Assert(cstate->rel);
     654             :     Assert(list_length(cstate->range_table) == 1);
     655             : 
     656        1552 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
     657             :         Assert(cstate->escontext);
     658             : 
     659             :     /*
     660             :      * The target must be a plain, foreign, or partitioned relation, or have
     661             :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     662             :      * allowed on views, so we only hint about them in the view case.)
     663             :      */
     664        1552 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     665         160 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     666         122 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     667          18 :         !(cstate->rel->trigdesc &&
     668          12 :           cstate->rel->trigdesc->trig_insert_instead_row))
     669             :     {
     670           6 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     671           6 :             ereport(ERROR,
     672             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     673             :                      errmsg("cannot copy to view \"%s\"",
     674             :                             RelationGetRelationName(cstate->rel)),
     675             :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     676           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     677           0 :             ereport(ERROR,
     678             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     679             :                      errmsg("cannot copy to materialized view \"%s\"",
     680             :                             RelationGetRelationName(cstate->rel))));
     681           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     682           0 :             ereport(ERROR,
     683             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     684             :                      errmsg("cannot copy to sequence \"%s\"",
     685             :                             RelationGetRelationName(cstate->rel))));
     686             :         else
     687           0 :             ereport(ERROR,
     688             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     689             :                      errmsg("cannot copy to non-table relation \"%s\"",
     690             :                             RelationGetRelationName(cstate->rel))));
     691             :     }
     692             : 
     693             :     /*
     694             :      * If the target file is new-in-transaction, we assume that checking FSM
     695             :      * for free space is a waste of time.  This could possibly be wrong, but
     696             :      * it's unlikely.
     697             :      */
     698        1546 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     699        1392 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     700        1380 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     701          88 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     702             : 
     703             :     /*
     704             :      * Optimize if new relation storage was created in this subxact or one of
     705             :      * its committed children and we won't see those rows later as part of an
     706             :      * earlier scan or command. The subxact test ensures that if this subxact
     707             :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     708             :      * that the stronger test of exactly which subtransaction created it is
     709             :      * crucial for correctness of this optimization. The test for an earlier
     710             :      * scan or command tolerates false negatives. FREEZE causes other sessions
     711             :      * to see rows they would not see under MVCC, and a false negative merely
     712             :      * spreads that anomaly to the current session.
     713             :      */
     714        1546 :     if (cstate->opts.freeze)
     715             :     {
     716             :         /*
     717             :          * We currently disallow COPY FREEZE on partitioned tables.  The
     718             :          * reason for this is that we've simply not yet opened the partitions
     719             :          * to determine if the optimization can be applied to them.  We could
     720             :          * go and open them all here, but doing so may be quite a costly
     721             :          * overhead for small copies.  In any case, we may just end up routing
     722             :          * tuples to a small number of partitions.  It seems better just to
     723             :          * raise an ERROR for partitioned tables.
     724             :          */
     725          66 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     726             :         {
     727           6 :             ereport(ERROR,
     728             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     729             :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     730             :         }
     731             : 
     732             :         /*
     733             :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     734             :          * Scan-bearing queries generally create at least two registrations,
     735             :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     736             :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     737             :          * still detect ongoing catalog scans, each of which separately
     738             :          * registers the snapshot it uses.
     739             :          */
     740          60 :         InvalidateCatalogSnapshot();
     741          60 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     742           0 :             ereport(ERROR,
     743             :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     744             :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     745             : 
     746         120 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     747          60 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     748          18 :             ereport(ERROR,
     749             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     750             :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     751             : 
     752          42 :         ti_options |= TABLE_INSERT_FROZEN;
     753             :     }
     754             : 
     755             :     /*
     756             :      * We need a ResultRelInfo so we can use the regular executor's
     757             :      * index-entry-making machinery.  (There used to be a huge amount of code
     758             :      * here that basically duplicated execUtils.c ...)
     759             :      */
     760        1522 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
     761        1522 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     762        1522 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     763             : 
     764             :     /* Verify the named relation is a valid target for INSERT */
     765        1522 :     CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
     766             : 
     767        1520 :     ExecOpenIndices(resultRelInfo, false);
     768             : 
     769             :     /*
     770             :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     771             :      * foreign-table result relation(s).
     772             :      */
     773        1520 :     mtstate = makeNode(ModifyTableState);
     774        1520 :     mtstate->ps.plan = NULL;
     775        1520 :     mtstate->ps.state = estate;
     776        1520 :     mtstate->operation = CMD_INSERT;
     777        1520 :     mtstate->mt_nrels = 1;
     778        1520 :     mtstate->resultRelInfo = resultRelInfo;
     779        1520 :     mtstate->rootResultRelInfo = resultRelInfo;
     780             : 
     781        1520 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     782          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     783          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     784             :                                                          resultRelInfo);
     785             : 
     786             :     /*
     787             :      * Also, if the named relation is a foreign table, determine if the FDW
     788             :      * supports batch insert and determine the batch size (a FDW may support
     789             :      * batching, but it may be disabled for the server/table).
     790             :      *
     791             :      * If the FDW does not support batching, we set the batch size to 1.
     792             :      */
     793        1520 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     794          36 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     795          36 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     796          36 :         resultRelInfo->ri_BatchSize =
     797          36 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     798             :     else
     799        1484 :         resultRelInfo->ri_BatchSize = 1;
     800             : 
     801             :     Assert(resultRelInfo->ri_BatchSize >= 1);
     802             : 
     803             :     /* Prepare to catch AFTER triggers. */
     804        1520 :     AfterTriggerBeginQuery();
     805             : 
     806             :     /*
     807             :      * If there are any triggers with transition tables on the named relation,
     808             :      * we need to be prepared to capture transition tuples.
     809             :      *
     810             :      * Because partition tuple routing would like to know about whether
     811             :      * transition capture is active, we also set it in mtstate, which is
     812             :      * passed to ExecFindPartition() below.
     813             :      */
     814        1520 :     cstate->transition_capture = mtstate->mt_transition_capture =
     815        1520 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     816        1520 :                                    RelationGetRelid(cstate->rel),
     817             :                                    CMD_INSERT);
     818             : 
     819             :     /*
     820             :      * If the named relation is a partitioned table, initialize state for
     821             :      * CopyFrom tuple routing.
     822             :      */
     823        1520 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     824          98 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     825             : 
     826        1520 :     if (cstate->whereClause)
     827          18 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     828             :                                         &mtstate->ps);
     829             : 
     830             :     /*
     831             :      * It's generally more efficient to prepare a bunch of tuples for
     832             :      * insertion, and insert them in one
     833             :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     834             :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     835             :      * However, there are a number of reasons why we might not be able to do
     836             :      * this.  These are explained below.
     837             :      */
     838        1520 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     839         176 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     840          84 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     841             :     {
     842             :         /*
     843             :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
     844             :          * triggers on the table. Such triggers might query the table we're
     845             :          * inserting into and act differently if the tuples that have already
     846             :          * been processed and prepared for insertion are not there.
     847             :          */
     848         104 :         insertMethod = CIM_SINGLE;
     849             :     }
     850        1416 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
     851          28 :              resultRelInfo->ri_BatchSize == 1)
     852             :     {
     853             :         /*
     854             :          * Can't support multi-inserts to a foreign table if the FDW does not
     855             :          * support batching, or it's disabled for the server or foreign table.
     856             :          */
     857          18 :         insertMethod = CIM_SINGLE;
     858             :     }
     859        1398 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
     860          26 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
     861             :     {
     862             :         /*
     863             :          * For partitioned tables we can't support multi-inserts when there
     864             :          * are any statement level insert triggers. It might be possible to
     865             :          * allow partitioned tables with such triggers in the future, but for
     866             :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
     867             :          * statement level insert triggers are on the same relation.
     868             :          */
     869          18 :         insertMethod = CIM_SINGLE;
     870             :     }
     871        1380 :     else if (cstate->volatile_defexprs)
     872             :     {
     873             :         /*
     874             :          * Can't support multi-inserts if there are any volatile default
     875             :          * expressions in the table.  Similarly to the trigger case above,
     876             :          * such expressions may query the table we're inserting into.
     877             :          *
     878             :          * Note: It does not matter if any partitions have any volatile
     879             :          * default expressions as we use the defaults from the target of the
     880             :          * COPY command.
     881             :          */
     882           6 :         insertMethod = CIM_SINGLE;
     883             :     }
     884        1374 :     else if (contain_volatile_functions(cstate->whereClause))
     885             :     {
     886             :         /*
     887             :          * Can't support multi-inserts if there are any volatile function
     888             :          * expressions in WHERE clause.  Similarly to the trigger case above,
     889             :          * such expressions may query the table we're inserting into.
     890             :          *
     891             :          * Note: the whereClause was already preprocessed in DoCopy(), so it's
     892             :          * okay to use contain_volatile_functions() directly.
     893             :          */
     894           0 :         insertMethod = CIM_SINGLE;
     895             :     }
     896             :     else
     897             :     {
     898             :         /*
     899             :          * For partitioned tables, we may still be able to perform bulk
     900             :          * inserts.  However, the possibility of this depends on which types
     901             :          * of triggers exist on the partition.  We must disable bulk inserts
     902             :          * if the partition is a foreign table that can't use batching or it
     903             :          * has any before row insert or insert instead triggers (same as we
     904             :          * checked above for the parent table).  Since the partition's
     905             :          * resultRelInfos are initialized only when we actually need to insert
     906             :          * the first tuple into them, we must have the intermediate insert
     907             :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
     908             :          * determine if we can use bulk-inserts for the partition being
     909             :          * inserted into.
     910             :          */
     911        1374 :         if (proute)
     912          74 :             insertMethod = CIM_MULTI_CONDITIONAL;
     913             :         else
     914        1300 :             insertMethod = CIM_MULTI;
     915             : 
     916        1374 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
     917             :                                 estate, mycid, ti_options);
     918             :     }
     919             : 
     920             :     /*
     921             :      * If not using batch mode (which allocates slots as needed) set up a
     922             :      * tuple slot too. When inserting into a partitioned table, we also need
     923             :      * one, even if we might batch insert, to read the tuple in the root
     924             :      * partition's form.
     925             :      */
     926        1520 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
     927             :     {
     928         220 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
     929             :                                        &estate->es_tupleTable);
     930         220 :         bistate = GetBulkInsertState();
     931             :     }
     932             : 
     933        1696 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     934         176 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
     935             : 
     936        1696 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     937         176 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
     938             : 
     939             :     /*
     940             :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
     941             :      * should do this for COPY, since it's not really an "INSERT" statement as
     942             :      * such. However, executing these triggers maintains consistency with the
     943             :      * EACH ROW triggers that we already fire on COPY.
     944             :      */
     945        1520 :     ExecBSInsertTriggers(estate, resultRelInfo);
     946             : 
     947        1520 :     econtext = GetPerTupleExprContext(estate);
     948             : 
     949             :     /* Set up callback to identify error line number */
     950        1520 :     errcallback.callback = CopyFromErrorCallback;
     951        1520 :     errcallback.arg = (void *) cstate;
     952        1520 :     errcallback.previous = error_context_stack;
     953        1520 :     error_context_stack = &errcallback;
     954             : 
     955             :     for (;;)
     956     1248392 :     {
     957             :         TupleTableSlot *myslot;
     958             :         bool        skip_tuple;
     959             : 
     960     1249912 :         CHECK_FOR_INTERRUPTS();
     961             : 
     962             :         /*
     963             :          * Reset the per-tuple exprcontext. We do this after every tuple, to
     964             :          * clean-up after expression evaluations etc.
     965             :          */
     966     1249912 :         ResetPerTupleExprContext(estate);
     967             : 
     968             :         /* select slot to (initially) load row into */
     969     1249912 :         if (insertMethod == CIM_SINGLE || proute)
     970             :         {
     971      274872 :             myslot = singleslot;
     972      274872 :             Assert(myslot != NULL);
     973             :         }
     974             :         else
     975             :         {
     976             :             Assert(resultRelInfo == target_resultRelInfo);
     977             :             Assert(insertMethod == CIM_MULTI);
     978             : 
     979      975040 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
     980             :                                                      resultRelInfo);
     981             :         }
     982             : 
     983             :         /*
     984             :          * Switch to per-tuple context before calling NextCopyFrom, which does
     985             :          * evaluate default expressions etc. and requires per-tuple context.
     986             :          */
     987     1249912 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     988             : 
     989     1249912 :         ExecClearTuple(myslot);
     990             : 
     991             :         /* Directly store the values/nulls array in the slot */
     992     1249912 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
     993        1358 :             break;
     994             : 
     995     1248426 :         if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
     996          66 :             cstate->escontext->error_occurred)
     997             :         {
     998             :             /*
     999             :              * Soft error occurred, skip this tuple and deal with error
    1000             :              * information according to ON_ERROR.
    1001             :              */
    1002          42 :             if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1003             : 
    1004             :                 /*
    1005             :                  * Just make ErrorSaveContext ready for the next NextCopyFrom.
    1006             :                  * Since we don't set details_wanted and error_data is not to
    1007             :                  * be filled, just resetting error_occurred is enough.
    1008             :                  */
    1009          42 :                 cstate->escontext->error_occurred = false;
    1010             : 
    1011             :             /* Report that this tuple was skipped by the ON_ERROR clause */
    1012          42 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
    1013             :                                          ++skipped);
    1014             : 
    1015          42 :             continue;
    1016             :         }
    1017             : 
    1018     1248384 :         ExecStoreVirtualTuple(myslot);
    1019             : 
    1020             :         /*
    1021             :          * Constraints and where clause might reference the tableoid column,
    1022             :          * so (re-)initialize tts_tableOid before evaluating them.
    1023             :          */
    1024     1248384 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
    1025             : 
    1026             :         /* Triggers and stuff need to be invoked in query context. */
    1027     1248384 :         MemoryContextSwitchTo(oldcontext);
    1028             : 
    1029     1248384 :         if (cstate->whereClause)
    1030             :         {
    1031          66 :             econtext->ecxt_scantuple = myslot;
    1032             :             /* Skip items that don't match COPY's WHERE clause */
    1033          66 :             if (!ExecQual(cstate->qualexpr, econtext))
    1034             :             {
    1035             :                 /*
    1036             :                  * Report that this tuple was filtered out by the WHERE
    1037             :                  * clause.
    1038             :                  */
    1039          36 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1040             :                                              ++excluded);
    1041          36 :                 continue;
    1042             :             }
    1043             :         }
    1044             : 
    1045             :         /* Determine the partition to insert the tuple into */
    1046     1248348 :         if (proute)
    1047             :         {
    1048             :             TupleConversionMap *map;
    1049             : 
    1050             :             /*
    1051             :              * Attempt to find a partition suitable for this tuple.
    1052             :              * ExecFindPartition() will raise an error if none can be found or
    1053             :              * if the found partition is not suitable for INSERTs.
    1054             :              */
    1055      274390 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1056             :                                               proute, myslot, estate);
    1057             : 
    1058      274388 :             if (prevResultRelInfo != resultRelInfo)
    1059             :             {
    1060             :                 /* Determine which triggers exist on this partition */
    1061      161566 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1062          54 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1063             : 
    1064      161566 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1065          54 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1066             : 
    1067             :                 /*
    1068             :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1069             :                  * OF triggers, or if the partition is a foreign table that
    1070             :                  * can't use batching.
    1071             :                  */
    1072      262970 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1073      101458 :                     !has_before_insert_row_trig &&
    1074      364404 :                     !has_instead_insert_row_trig &&
    1075      101434 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1076          12 :                      resultRelInfo->ri_BatchSize > 1);
    1077             : 
    1078             :                 /* Set the multi-insert buffer to use for this partition. */
    1079      161512 :                 if (leafpart_use_multi_insert)
    1080             :                 {
    1081      101430 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1082          86 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1083             :                                                        resultRelInfo);
    1084             :                 }
    1085       60082 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1086          28 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1087             :                 {
    1088             :                     /*
    1089             :                      * Flush pending inserts if this partition can't use
    1090             :                      * batching, so rows are visible to triggers etc.
    1091             :                      */
    1092           0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1093             :                                              resultRelInfo,
    1094             :                                              &processed);
    1095             :                 }
    1096             : 
    1097      161512 :                 if (bistate != NULL)
    1098      161512 :                     ReleaseBulkInsertStatePin(bistate);
    1099      161512 :                 prevResultRelInfo = resultRelInfo;
    1100             :             }
    1101             : 
    1102             :             /*
    1103             :              * If we're capturing transition tuples, we might need to convert
    1104             :              * from the partition rowtype to root rowtype. But if there are no
    1105             :              * BEFORE triggers on the partition that could change the tuple,
    1106             :              * we can just remember the original unconverted tuple to avoid a
    1107             :              * needless round trip conversion.
    1108             :              */
    1109      274388 :             if (cstate->transition_capture != NULL)
    1110          54 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1111          54 :                     !has_before_insert_row_trig ? myslot : NULL;
    1112             : 
    1113             :             /*
    1114             :              * We might need to convert from the root rowtype to the partition
    1115             :              * rowtype.
    1116             :              */
    1117      274388 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1118      274388 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1119             :             {
    1120             :                 /* non batch insert */
    1121       60136 :                 if (map != NULL)
    1122             :                 {
    1123             :                     TupleTableSlot *new_slot;
    1124             : 
    1125         110 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1126         110 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1127             :                 }
    1128             :             }
    1129             :             else
    1130             :             {
    1131             :                 /*
    1132             :                  * Prepare to queue up tuple for later batch insert into
    1133             :                  * current partition.
    1134             :                  */
    1135             :                 TupleTableSlot *batchslot;
    1136             : 
    1137             :                 /* no other path available for partitioned table */
    1138             :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1139             : 
    1140      214252 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1141             :                                                             resultRelInfo);
    1142             : 
    1143      214252 :                 if (map != NULL)
    1144       12200 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1145             :                                                    batchslot);
    1146             :                 else
    1147             :                 {
    1148             :                     /*
    1149             :                      * This looks more expensive than it is (Believe me, I
    1150             :                      * optimized it away. Twice.). The input is in virtual
    1151             :                      * form, and we'll materialize the slot below - for most
    1152             :                      * slot types the copy performs the work materialization
    1153             :                      * would later require anyway.
    1154             :                      */
    1155      202052 :                     ExecCopySlot(batchslot, myslot);
    1156      202052 :                     myslot = batchslot;
    1157             :                 }
    1158             :             }
    1159             : 
    1160             :             /* ensure that triggers etc see the right relation  */
    1161      274388 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1162             :         }
    1163             : 
    1164     1248346 :         skip_tuple = false;
    1165             : 
    1166             :         /* BEFORE ROW INSERT Triggers */
    1167     1248346 :         if (has_before_insert_row_trig)
    1168             :         {
    1169         274 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1170          16 :                 skip_tuple = true;  /* "do nothing" */
    1171             :         }
    1172             : 
    1173     1248346 :         if (!skip_tuple)
    1174             :         {
    1175             :             /*
    1176             :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1177             :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1178             :              * table or foreign table.
    1179             :              */
    1180     1248330 :             if (has_instead_insert_row_trig)
    1181             :             {
    1182          12 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1183             :             }
    1184             :             else
    1185             :             {
    1186             :                 /* Compute stored generated columns */
    1187     1248318 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1188      461860 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1189          36 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1190             :                                                CMD_INSERT);
    1191             : 
    1192             :                 /*
    1193             :                  * If the target is a plain table, check the constraints of
    1194             :                  * the tuple.
    1195             :                  */
    1196     1248318 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1197     1248230 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1198      461824 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1199             : 
    1200             :                 /*
    1201             :                  * Also check the tuple against the partition constraint, if
    1202             :                  * there is one; except that if we got here via tuple-routing,
    1203             :                  * we don't need to if there's no BR trigger defined on the
    1204             :                  * partition.
    1205             :                  */
    1206     1248288 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1207      274376 :                     (proute == NULL || has_before_insert_row_trig))
    1208        2308 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1209             : 
    1210             :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1211     1248288 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1212             :                 {
    1213             :                     /*
    1214             :                      * The slot previously might point into the per-tuple
    1215             :                      * context. For batching it needs to be longer lived.
    1216             :                      */
    1217     1187944 :                     ExecMaterializeSlot(myslot);
    1218             : 
    1219             :                     /* Add this tuple to the tuple buffer */
    1220     1187944 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1221             :                                              resultRelInfo, myslot,
    1222             :                                              cstate->line_buf.len,
    1223             :                                              cstate->cur_lineno);
    1224             : 
    1225             :                     /*
    1226             :                      * If enough inserts have queued up, then flush all
    1227             :                      * buffers out to their tables.
    1228             :                      */
    1229     1187944 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1230        1100 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1231             :                                                  resultRelInfo,
    1232             :                                                  &processed);
    1233             : 
    1234             :                     /*
    1235             :                      * We delay updating the row counter and progress of the
    1236             :                      * COPY command until after writing the tuples stored in
    1237             :                      * the buffer out to the table, as in single insert mode.
    1238             :                      * See CopyMultiInsertBufferFlush().
    1239             :                      */
    1240     1187944 :                     continue;   /* next tuple please */
    1241             :                 }
    1242             :                 else
    1243             :                 {
    1244       60344 :                     List       *recheckIndexes = NIL;
    1245             : 
    1246             :                     /* OK, store the tuple */
    1247       60344 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1248             :                     {
    1249          50 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1250             :                                                                                  resultRelInfo,
    1251             :                                                                                  myslot,
    1252             :                                                                                  NULL);
    1253             : 
    1254          48 :                         if (myslot == NULL) /* "do nothing" */
    1255           4 :                             continue;   /* next tuple please */
    1256             : 
    1257             :                         /*
    1258             :                          * AFTER ROW Triggers might reference the tableoid
    1259             :                          * column, so (re-)initialize tts_tableOid before
    1260             :                          * evaluating them.
    1261             :                          */
    1262          44 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1263             :                     }
    1264             :                     else
    1265             :                     {
    1266             :                         /* OK, store the tuple and create index entries for it */
    1267       60294 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1268             :                                            myslot, mycid, ti_options, bistate);
    1269             : 
    1270       60294 :                         if (resultRelInfo->ri_NumIndices > 0)
    1271           0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1272             :                                                                    myslot,
    1273             :                                                                    estate,
    1274             :                                                                    false,
    1275             :                                                                    false,
    1276             :                                                                    NULL,
    1277             :                                                                    NIL,
    1278             :                                                                    false);
    1279             :                     }
    1280             : 
    1281             :                     /* AFTER ROW INSERT Triggers */
    1282       60338 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1283             :                                          recheckIndexes, cstate->transition_capture);
    1284             : 
    1285       60338 :                     list_free(recheckIndexes);
    1286             :                 }
    1287             :             }
    1288             : 
    1289             :             /*
    1290             :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1291             :              * or FDW; this is the same definition used by nodeModifyTable.c
    1292             :              * for counting tuples inserted by an INSERT command.  Update
    1293             :              * progress of the COPY command as well.
    1294             :              */
    1295       60350 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1296             :                                          ++processed);
    1297             :         }
    1298             :     }
    1299             : 
    1300             :     /* Flush any remaining buffered tuples */
    1301        1358 :     if (insertMethod != CIM_SINGLE)
    1302             :     {
    1303        1238 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1304        1008 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1305             :     }
    1306             : 
    1307             :     /* Done, clean up */
    1308        1344 :     error_context_stack = errcallback.previous;
    1309             : 
    1310        1344 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
    1311          12 :         cstate->num_errors > 0)
    1312          12 :         ereport(NOTICE,
    1313             :                 errmsg_plural("%llu row was skipped due to data type incompatibility",
    1314             :                               "%llu rows were skipped due to data type incompatibility",
    1315             :                               (unsigned long long) cstate->num_errors,
    1316             :                               (unsigned long long) cstate->num_errors));
    1317             : 
    1318        1344 :     if (bistate != NULL)
    1319         192 :         FreeBulkInsertState(bistate);
    1320             : 
    1321        1344 :     MemoryContextSwitchTo(oldcontext);
    1322             : 
    1323             :     /* Execute AFTER STATEMENT insertion triggers */
    1324        1344 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1325             : 
    1326             :     /* Handle queued AFTER triggers */
    1327        1344 :     AfterTriggerEndQuery(estate);
    1328             : 
    1329        1344 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1330             : 
    1331             :     /* Allow the FDW to shut down */
    1332        1344 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1333          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1334          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1335             :                                                               target_resultRelInfo);
    1336             : 
    1337             :     /* Tear down the multi-insert buffer data */
    1338        1344 :     if (insertMethod != CIM_SINGLE)
    1339        1224 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1340             : 
    1341             :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1342        1344 :     if (proute)
    1343          96 :         ExecCleanupTupleRouting(mtstate, proute);
    1344             : 
    1345             :     /* Close the result relations, including any trigger target relations */
    1346        1344 :     ExecCloseResultRelations(estate);
    1347        1344 :     ExecCloseRangeTableRelations(estate);
    1348             : 
    1349        1344 :     FreeExecutorState(estate);
    1350             : 
    1351        1344 :     return processed;
    1352             : }
    1353             : 
    1354             : /*
    1355             :  * Setup to read tuples from a file for COPY FROM.
    1356             :  *
    1357             :  * 'rel': Used as a template for the tuples
    1358             :  * 'whereClause': WHERE clause from the COPY FROM command
    1359             :  * 'filename': Name of server-local file to read, NULL for STDIN
    1360             :  * 'is_program': true if 'filename' is program to execute
    1361             :  * 'data_source_cb': callback that provides the input data
    1362             :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1363             :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1364             :  *
    1365             :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1366             :  */
    1367             : CopyFromState
    1368        1796 : BeginCopyFrom(ParseState *pstate,
    1369             :               Relation rel,
    1370             :               Node *whereClause,
    1371             :               const char *filename,
    1372             :               bool is_program,
    1373             :               copy_data_source_cb data_source_cb,
    1374             :               List *attnamelist,
    1375             :               List *options)
    1376             : {
    1377             :     CopyFromState cstate;
    1378        1796 :     bool        pipe = (filename == NULL);
    1379             :     TupleDesc   tupDesc;
    1380             :     AttrNumber  num_phys_attrs,
    1381             :                 num_defaults;
    1382             :     FmgrInfo   *in_functions;
    1383             :     Oid        *typioparams;
    1384             :     Oid         in_func_oid;
    1385             :     int        *defmap;
    1386             :     ExprState **defexprs;
    1387             :     MemoryContext oldcontext;
    1388             :     bool        volatile_defexprs;
    1389        1796 :     const int   progress_cols[] = {
    1390             :         PROGRESS_COPY_COMMAND,
    1391             :         PROGRESS_COPY_TYPE,
    1392             :         PROGRESS_COPY_BYTES_TOTAL
    1393             :     };
    1394        1796 :     int64       progress_vals[] = {
    1395             :         PROGRESS_COPY_COMMAND_FROM,
    1396             :         0,
    1397             :         0
    1398             :     };
    1399             : 
    1400             :     /* Allocate workspace and zero all fields */
    1401        1796 :     cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
    1402             : 
    1403             :     /*
    1404             :      * We allocate everything used by a cstate in a new memory context. This
    1405             :      * avoids memory leaks during repeated use of COPY in a query.
    1406             :      */
    1407        1796 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1408             :                                                 "COPY",
    1409             :                                                 ALLOCSET_DEFAULT_SIZES);
    1410             : 
    1411        1796 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1412             : 
    1413             :     /* Extract options from the statement node tree */
    1414        1796 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1415             : 
    1416             :     /* Process the target relation */
    1417        1638 :     cstate->rel = rel;
    1418             : 
    1419        1638 :     tupDesc = RelationGetDescr(cstate->rel);
    1420             : 
    1421             :     /* process common options or initialization */
    1422             : 
    1423             :     /* Generate or convert list of attributes to process */
    1424        1638 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1425             : 
    1426        1638 :     num_phys_attrs = tupDesc->natts;
    1427             : 
    1428             :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1429        1638 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1430        1638 :     if (cstate->opts.force_notnull_all)
    1431          12 :         MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
    1432        1626 :     else if (cstate->opts.force_notnull)
    1433             :     {
    1434             :         List       *attnums;
    1435             :         ListCell   *cur;
    1436             : 
    1437          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1438             : 
    1439          52 :         foreach(cur, attnums)
    1440             :         {
    1441          32 :             int         attnum = lfirst_int(cur);
    1442          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1443             : 
    1444          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1445           6 :                 ereport(ERROR,
    1446             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1447             :                          errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
    1448             :                                 NameStr(attr->attname))));
    1449          26 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1450             :         }
    1451             :     }
    1452             : 
    1453             :     /* Set up soft error handler for ON_ERROR */
    1454        1632 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
    1455             :     {
    1456          30 :         cstate->escontext = makeNode(ErrorSaveContext);
    1457          30 :         cstate->escontext->type = T_ErrorSaveContext;
    1458          30 :         cstate->escontext->error_occurred = false;
    1459             : 
    1460             :         /*
    1461             :          * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
    1462             :          * options later
    1463             :          */
    1464          30 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1465          30 :             cstate->escontext->details_wanted = false;
    1466             :     }
    1467             :     else
    1468        1602 :         cstate->escontext = NULL;
    1469             : 
    1470             :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1471        1632 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1472        1632 :     if (cstate->opts.force_null_all)
    1473          12 :         MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
    1474        1620 :     else if (cstate->opts.force_null)
    1475             :     {
    1476             :         List       *attnums;
    1477             :         ListCell   *cur;
    1478             : 
    1479          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1480             : 
    1481          52 :         foreach(cur, attnums)
    1482             :         {
    1483          32 :             int         attnum = lfirst_int(cur);
    1484          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1485             : 
    1486          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1487           6 :                 ereport(ERROR,
    1488             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1489             :                          errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
    1490             :                                 NameStr(attr->attname))));
    1491          26 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1492             :         }
    1493             :     }
    1494             : 
    1495             :     /* Convert convert_selectively name list to per-column flags */
    1496        1626 :     if (cstate->opts.convert_selectively)
    1497             :     {
    1498             :         List       *attnums;
    1499             :         ListCell   *cur;
    1500             : 
    1501           4 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1502             : 
    1503           4 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1504             : 
    1505           8 :         foreach(cur, attnums)
    1506             :         {
    1507           4 :             int         attnum = lfirst_int(cur);
    1508           4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1509             : 
    1510           4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1511           0 :                 ereport(ERROR,
    1512             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1513             :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1514             :                                          NameStr(attr->attname))));
    1515           4 :             cstate->convert_select_flags[attnum - 1] = true;
    1516             :         }
    1517             :     }
    1518             : 
    1519             :     /* Use client encoding when ENCODING option is not specified. */
    1520        1626 :     if (cstate->opts.file_encoding < 0)
    1521        1620 :         cstate->file_encoding = pg_get_client_encoding();
    1522             :     else
    1523           6 :         cstate->file_encoding = cstate->opts.file_encoding;
    1524             : 
    1525             :     /*
    1526             :      * Look up encoding conversion function.
    1527             :      */
    1528        1626 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1529           6 :         cstate->file_encoding == PG_SQL_ASCII ||
    1530           0 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1531             :     {
    1532        1626 :         cstate->need_transcoding = false;
    1533             :     }
    1534             :     else
    1535             :     {
    1536           0 :         cstate->need_transcoding = true;
    1537           0 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1538             :                                                             GetDatabaseEncoding());
    1539           0 :         if (!OidIsValid(cstate->conversion_proc))
    1540           0 :             ereport(ERROR,
    1541             :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
    1542             :                      errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
    1543             :                             pg_encoding_to_char(cstate->file_encoding),
    1544             :                             pg_encoding_to_char(GetDatabaseEncoding()))));
    1545             :     }
    1546             : 
    1547        1626 :     cstate->copy_src = COPY_FILE;    /* default */
    1548             : 
    1549        1626 :     cstate->whereClause = whereClause;
    1550             : 
    1551             :     /* Initialize state variables */
    1552        1626 :     cstate->eol_type = EOL_UNKNOWN;
    1553        1626 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1554        1626 :     cstate->cur_lineno = 0;
    1555        1626 :     cstate->cur_attname = NULL;
    1556        1626 :     cstate->cur_attval = NULL;
    1557        1626 :     cstate->relname_only = false;
    1558             : 
    1559             :     /*
    1560             :      * Allocate buffers for the input pipeline.
    1561             :      *
    1562             :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1563             :      * input_buf and line_buf only in text mode.
    1564             :      */
    1565        1626 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1566        1626 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1567        1626 :     cstate->raw_reached_eof = false;
    1568             : 
    1569        1626 :     if (!cstate->opts.binary)
    1570             :     {
    1571             :         /*
    1572             :          * If encoding conversion is needed, we need another buffer to hold
    1573             :          * the converted input data.  Otherwise, we can just point input_buf
    1574             :          * to the same buffer as raw_buf.
    1575             :          */
    1576        1610 :         if (cstate->need_transcoding)
    1577             :         {
    1578           0 :             cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
    1579           0 :             cstate->input_buf_index = cstate->input_buf_len = 0;
    1580             :         }
    1581             :         else
    1582        1610 :             cstate->input_buf = cstate->raw_buf;
    1583        1610 :         cstate->input_reached_eof = false;
    1584             : 
    1585        1610 :         initStringInfo(&cstate->line_buf);
    1586             :     }
    1587             : 
    1588        1626 :     initStringInfo(&cstate->attribute_buf);
    1589             : 
    1590             :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1591        1626 :     if (pstate)
    1592             :     {
    1593        1566 :         cstate->range_table = pstate->p_rtable;
    1594        1566 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1595             :     }
    1596             : 
    1597        1626 :     num_defaults = 0;
    1598        1626 :     volatile_defexprs = false;
    1599             : 
    1600             :     /*
    1601             :      * Pick up the required catalog information for each attribute in the
    1602             :      * relation, including the input function, the element type (to pass to
    1603             :      * the input function), and info about defaults and constraints. (Which
    1604             :      * input function we use depends on text/binary format choice.)
    1605             :      */
    1606        1626 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1607        1626 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1608        1626 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1609        1626 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1610             : 
    1611        6322 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1612             :     {
    1613        4710 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1614             : 
    1615             :         /* We don't need info for dropped attributes */
    1616        4710 :         if (att->attisdropped)
    1617         124 :             continue;
    1618             : 
    1619             :         /* Fetch the input function and typioparam info */
    1620        4586 :         if (cstate->opts.binary)
    1621          62 :             getTypeBinaryInputInfo(att->atttypid,
    1622          62 :                                    &in_func_oid, &typioparams[attnum - 1]);
    1623             :         else
    1624        4524 :             getTypeInputInfo(att->atttypid,
    1625        4524 :                              &in_func_oid, &typioparams[attnum - 1]);
    1626        4584 :         fmgr_info(in_func_oid, &in_functions[attnum - 1]);
    1627             : 
    1628             :         /* Get default info if available */
    1629        4584 :         defexprs[attnum - 1] = NULL;
    1630             : 
    1631             :         /*
    1632             :          * We only need the default values for columns that do not appear in
    1633             :          * the column list, unless the DEFAULT option was given. We never need
    1634             :          * default values for generated columns.
    1635             :          */
    1636        4584 :         if ((cstate->opts.default_print != NULL ||
    1637        4458 :              !list_member_int(cstate->attnumlist, attnum)) &&
    1638         600 :             !att->attgenerated)
    1639             :         {
    1640         580 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1641             :                                                                 attnum);
    1642             : 
    1643         580 :             if (defexpr != NULL)
    1644             :             {
    1645             :                 /* Run the expression through planner */
    1646         264 :                 defexpr = expression_planner(defexpr);
    1647             : 
    1648             :                 /* Initialize executable expression in copycontext */
    1649         252 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1650             : 
    1651             :                 /* if NOT copied from input */
    1652             :                 /* use default value if one exists */
    1653         252 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1654             :                 {
    1655         172 :                     defmap[num_defaults] = attnum - 1;
    1656         172 :                     num_defaults++;
    1657             :                 }
    1658             : 
    1659             :                 /*
    1660             :                  * If a default expression looks at the table being loaded,
    1661             :                  * then it could give the wrong answer when using
    1662             :                  * multi-insert. Since database access can be dynamic this is
    1663             :                  * hard to test for exactly, so we use the much wider test of
    1664             :                  * whether the default expression is volatile. We allow for
    1665             :                  * the special case of when the default expression is the
    1666             :                  * nextval() of a sequence which in this specific case is
    1667             :                  * known to be safe for use with the multi-insert
    1668             :                  * optimization. Hence we use this special case function
    1669             :                  * checker rather than the standard check for
    1670             :                  * contain_volatile_functions().  Note also that we already
    1671             :                  * ran the expression through expression_planner().
    1672             :                  */
    1673         252 :                 if (!volatile_defexprs)
    1674         252 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1675             :             }
    1676             :         }
    1677             :     }
    1678             : 
    1679        1612 :     cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
    1680             : 
    1681             :     /* initialize progress */
    1682        1612 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1683        1612 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1684        1612 :     cstate->bytes_processed = 0;
    1685             : 
    1686             :     /* We keep those variables in cstate. */
    1687        1612 :     cstate->in_functions = in_functions;
    1688        1612 :     cstate->typioparams = typioparams;
    1689        1612 :     cstate->defmap = defmap;
    1690        1612 :     cstate->defexprs = defexprs;
    1691        1612 :     cstate->volatile_defexprs = volatile_defexprs;
    1692        1612 :     cstate->num_defaults = num_defaults;
    1693        1612 :     cstate->is_program = is_program;
    1694             : 
    1695        1612 :     if (data_source_cb)
    1696             :     {
    1697         334 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1698         334 :         cstate->copy_src = COPY_CALLBACK;
    1699         334 :         cstate->data_source_cb = data_source_cb;
    1700             :     }
    1701        1278 :     else if (pipe)
    1702             :     {
    1703         890 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1704             :         Assert(!is_program);    /* the grammar does not allow this */
    1705         890 :         if (whereToSendOutput == DestRemote)
    1706         890 :             ReceiveCopyBegin(cstate);
    1707             :         else
    1708           0 :             cstate->copy_file = stdin;
    1709             :     }
    1710             :     else
    1711             :     {
    1712         388 :         cstate->filename = pstrdup(filename);
    1713             : 
    1714         388 :         if (cstate->is_program)
    1715             :         {
    1716           0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1717           0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1718           0 :             if (cstate->copy_file == NULL)
    1719           0 :                 ereport(ERROR,
    1720             :                         (errcode_for_file_access(),
    1721             :                          errmsg("could not execute command \"%s\": %m",
    1722             :                                 cstate->filename)));
    1723             :         }
    1724             :         else
    1725             :         {
    1726             :             struct stat st;
    1727             : 
    1728         388 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1729         388 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1730         388 :             if (cstate->copy_file == NULL)
    1731             :             {
    1732             :                 /* copy errno because ereport subfunctions might change it */
    1733           0 :                 int         save_errno = errno;
    1734             : 
    1735           0 :                 ereport(ERROR,
    1736             :                         (errcode_for_file_access(),
    1737             :                          errmsg("could not open file \"%s\" for reading: %m",
    1738             :                                 cstate->filename),
    1739             :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1740             :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1741             :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1742             :             }
    1743             : 
    1744         388 :             if (fstat(fileno(cstate->copy_file), &st))
    1745           0 :                 ereport(ERROR,
    1746             :                         (errcode_for_file_access(),
    1747             :                          errmsg("could not stat file \"%s\": %m",
    1748             :                                 cstate->filename)));
    1749             : 
    1750         388 :             if (S_ISDIR(st.st_mode))
    1751           0 :                 ereport(ERROR,
    1752             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1753             :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1754             : 
    1755         388 :             progress_vals[2] = st.st_size;
    1756             :         }
    1757             :     }
    1758             : 
    1759        1612 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1760             : 
    1761        1612 :     if (cstate->opts.binary)
    1762             :     {
    1763             :         /* Read and verify binary header */
    1764          14 :         ReceiveCopyBinaryHeader(cstate);
    1765             :     }
    1766             : 
    1767             :     /* create workspace for CopyReadAttributes results */
    1768        1612 :     if (!cstate->opts.binary)
    1769             :     {
    1770        1598 :         AttrNumber  attr_count = list_length(cstate->attnumlist);
    1771             : 
    1772        1598 :         cstate->max_fields = attr_count;
    1773        1598 :         cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
    1774             :     }
    1775             : 
    1776        1612 :     MemoryContextSwitchTo(oldcontext);
    1777             : 
    1778        1612 :     return cstate;
    1779             : }
    1780             : 
    1781             : /*
    1782             :  * Clean up storage and release resources for COPY FROM.
    1783             :  */
    1784             : void
    1785        1080 : EndCopyFrom(CopyFromState cstate)
    1786             : {
    1787             :     /* No COPY FROM related resources except memory. */
    1788        1080 :     if (cstate->is_program)
    1789             :     {
    1790           0 :         ClosePipeFromProgram(cstate);
    1791             :     }
    1792             :     else
    1793             :     {
    1794        1080 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1795           0 :             ereport(ERROR,
    1796             :                     (errcode_for_file_access(),
    1797             :                      errmsg("could not close file \"%s\": %m",
    1798             :                             cstate->filename)));
    1799             :     }
    1800             : 
    1801        1080 :     pgstat_progress_end_command();
    1802             : 
    1803        1080 :     MemoryContextDelete(cstate->copycontext);
    1804        1080 :     pfree(cstate);
    1805        1080 : }
    1806             : 
    1807             : /*
    1808             :  * Closes the pipe from an external program, checking the pclose() return code.
    1809             :  */
    1810             : static void
    1811           0 : ClosePipeFromProgram(CopyFromState cstate)
    1812             : {
    1813             :     int         pclose_rc;
    1814             : 
    1815             :     Assert(cstate->is_program);
    1816             : 
    1817           0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1818           0 :     if (pclose_rc == -1)
    1819           0 :         ereport(ERROR,
    1820             :                 (errcode_for_file_access(),
    1821             :                  errmsg("could not close pipe to external command: %m")));
    1822           0 :     else if (pclose_rc != 0)
    1823             :     {
    1824             :         /*
    1825             :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1826             :          * expectable for the called program to fail with SIGPIPE, and we
    1827             :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1828             :          * problem.
    1829             :          */
    1830           0 :         if (!cstate->raw_reached_eof &&
    1831           0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1832           0 :             return;
    1833             : 
    1834           0 :         ereport(ERROR,
    1835             :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1836             :                  errmsg("program \"%s\" failed",
    1837             :                         cstate->filename),
    1838             :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1839             :     }
    1840             : }

Generated by: LCOV version 1.14