LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.6 % 592 542
Test Date: 2026-04-06 03:16:28 Functions: 95.8 % 24 23
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * copyfrom.c
       4              :  *      COPY <table> FROM file/program/client
       5              :  *
       6              :  * This file contains routines needed to efficiently load tuples into a
       7              :  * table.  That includes looking up the correct partition, firing triggers,
       8              :  * calling the table AM function to insert the data, and updating indexes.
       9              :  * Reading data from the input file or client and parsing it into Datums
      10              :  * is handled in copyfromparse.c.
      11              :  *
      12              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      13              :  * Portions Copyright (c) 1994, Regents of the University of California
      14              :  *
      15              :  *
      16              :  * IDENTIFICATION
      17              :  *    src/backend/commands/copyfrom.c
      18              :  *
      19              :  *-------------------------------------------------------------------------
      20              :  */
      21              : #include "postgres.h"
      22              : 
      23              : #include <ctype.h>
      24              : #include <unistd.h>
      25              : #include <sys/stat.h>
      26              : 
      27              : #include "access/heapam.h"
      28              : #include "access/tableam.h"
      29              : #include "access/tupconvert.h"
      30              : #include "access/xact.h"
      31              : #include "catalog/namespace.h"
      32              : #include "commands/copyapi.h"
      33              : #include "commands/copyfrom_internal.h"
      34              : #include "commands/progress.h"
      35              : #include "commands/trigger.h"
      36              : #include "executor/execPartition.h"
      37              : #include "executor/executor.h"
      38              : #include "executor/nodeModifyTable.h"
      39              : #include "executor/tuptable.h"
      40              : #include "foreign/fdwapi.h"
      41              : #include "mb/pg_wchar.h"
      42              : #include "miscadmin.h"
      43              : #include "nodes/miscnodes.h"
      44              : #include "optimizer/optimizer.h"
      45              : #include "pgstat.h"
      46              : #include "rewrite/rewriteHandler.h"
      47              : #include "storage/fd.h"
      48              : #include "tcop/tcopprot.h"
      49              : #include "utils/lsyscache.h"
      50              : #include "utils/memutils.h"
      51              : #include "utils/portal.h"
      52              : #include "utils/rel.h"
      53              : #include "utils/snapmgr.h"
      54              : #include "utils/typcache.h"
      55              : 
      56              : /*
      57              :  * No more than this many tuples per CopyMultiInsertBuffer
      58              :  *
      59              :  * Caution: Don't make this too big, as we could end up with this many
      60              :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      61              :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      62              :  * memory requirements during copies into partitioned tables with a large
      63              :  * number of partitions.
      64              :  */
      65              : #define MAX_BUFFERED_TUPLES     1000
      66              : 
      67              : /*
      68              :  * Flush buffers if there are >= this many bytes, as counted by the input
      69              :  * size, of tuples stored.
      70              :  */
      71              : #define MAX_BUFFERED_BYTES      65535
      72              : 
      73              : /*
      74              :  * Trim the list of buffers back down to this number after flushing.  This
      75              :  * must be >= 2.
      76              :  */
      77              : #define MAX_PARTITION_BUFFERS   32
      78              : 
      79              : /* Stores multi-insert data related to a single relation in CopyFrom. */
      80              : typedef struct CopyMultiInsertBuffer
      81              : {
      82              :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      83              :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      84              :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      85              :                                  * table; NULL if foreign table */
      86              :     int         nused;          /* number of 'slots' containing tuples */
      87              :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      88              :                                                  * stream */
      89              : } CopyMultiInsertBuffer;
      90              : 
      91              : /*
      92              :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      93              :  * number of tuples which are stored in them.  This allows multiple buffers to
      94              :  * exist at once when COPYing into a partitioned table.
      95              :  */
      96              : typedef struct CopyMultiInsertInfo
      97              : {
      98              :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      99              :     int         bufferedTuples; /* number of tuples buffered over all buffers */
     100              :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
     101              :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
     102              :     EState     *estate;         /* Executor state used for COPY */
     103              :     CommandId   mycid;          /* Command Id used for COPY */
     104              :     uint32      ti_options;     /* table insert options */
     105              : } CopyMultiInsertInfo;
     106              : 
     107              : 
     108              : /* non-export function prototypes */
     109              : static void ClosePipeFromProgram(CopyFromState cstate);
     110              : 
     111              : /*
     112              :  * Built-in format-specific routines. One-row callbacks are defined in
     113              :  * copyfromparse.c.
     114              :  */
     115              : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     116              :                                    Oid *typioparam);
     117              : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
     118              : static void CopyFromTextLikeEnd(CopyFromState cstate);
     119              : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     120              :                                  FmgrInfo *finfo, Oid *typioparam);
     121              : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
     122              : static void CopyFromBinaryEnd(CopyFromState cstate);
     123              : 
     124              : 
     125              : /*
     126              :  * COPY FROM routines for built-in formats.
     127              :  *
     128              :  * CSV and text formats share the same TextLike routines except for the
     129              :  * one-row callback.
     130              :  */
     131              : 
     132              : /* text format */
     133              : static const CopyFromRoutine CopyFromRoutineText = {
     134              :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     135              :     .CopyFromStart = CopyFromTextLikeStart,
     136              :     .CopyFromOneRow = CopyFromTextOneRow,
     137              :     .CopyFromEnd = CopyFromTextLikeEnd,
     138              : };
     139              : 
     140              : /* CSV format */
     141              : static const CopyFromRoutine CopyFromRoutineCSV = {
     142              :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     143              :     .CopyFromStart = CopyFromTextLikeStart,
     144              :     .CopyFromOneRow = CopyFromCSVOneRow,
     145              :     .CopyFromEnd = CopyFromTextLikeEnd,
     146              : };
     147              : 
     148              : /* binary format */
     149              : static const CopyFromRoutine CopyFromRoutineBinary = {
     150              :     .CopyFromInFunc = CopyFromBinaryInFunc,
     151              :     .CopyFromStart = CopyFromBinaryStart,
     152              :     .CopyFromOneRow = CopyFromBinaryOneRow,
     153              :     .CopyFromEnd = CopyFromBinaryEnd,
     154              : };
     155              : 
     156              : /* Return a COPY FROM routine for the given options */
     157              : static const CopyFromRoutine *
     158         1195 : CopyFromGetRoutine(const CopyFormatOptions *opts)
     159              : {
     160         1195 :     if (opts->format == COPY_FORMAT_CSV)
     161          189 :         return &CopyFromRoutineCSV;
     162         1006 :     else if (opts->format == COPY_FORMAT_BINARY)
     163            9 :         return &CopyFromRoutineBinary;
     164              : 
     165              :     /* default is text */
     166          997 :     return &CopyFromRoutineText;
     167              : }
     168              : 
     169              : /* Implementation of the start callback for text and CSV formats */
     170              : static void
     171         1170 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
     172              : {
     173              :     AttrNumber  attr_count;
     174              : 
     175              :     /*
     176              :      * If encoding conversion is needed, we need another buffer to hold the
     177              :      * converted input data.  Otherwise, we can just point input_buf to the
     178              :      * same buffer as raw_buf.
     179              :      */
     180         1170 :     if (cstate->need_transcoding)
     181              :     {
     182           24 :         cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
     183           24 :         cstate->input_buf_index = cstate->input_buf_len = 0;
     184              :     }
     185              :     else
     186         1146 :         cstate->input_buf = cstate->raw_buf;
     187         1170 :     cstate->input_reached_eof = false;
     188              : 
     189         1170 :     initStringInfo(&cstate->line_buf);
     190              : 
     191              :     /*
     192              :      * Create workspace for CopyReadAttributes results; used by CSV and text
     193              :      * format.
     194              :      */
     195         1170 :     attr_count = list_length(cstate->attnumlist);
     196         1170 :     cstate->max_fields = attr_count;
     197         1170 :     cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
     198         1170 : }
     199              : 
     200              : /*
     201              :  * Implementation of the infunc callback for text and CSV formats. Assign
     202              :  * the input function data to the given *finfo.
     203              :  */
     204              : static void
     205         3433 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     206              :                        Oid *typioparam)
     207              : {
     208              :     Oid         func_oid;
     209              : 
     210         3433 :     getTypeInputInfo(atttypid, &func_oid, typioparam);
     211         3433 :     fmgr_info(func_oid, finfo);
     212         3433 : }
     213              : 
     214              : /* Implementation of the end callback for text and CSV formats */
     215              : static void
     216          786 : CopyFromTextLikeEnd(CopyFromState cstate)
     217              : {
     218              :     /* nothing to do */
     219          786 : }
     220              : 
     221              : /* Implementation of the start callback for binary format */
     222              : static void
     223            8 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
     224              : {
     225              :     /* Read and verify binary header */
     226            8 :     ReceiveCopyBinaryHeader(cstate);
     227            8 : }
     228              : 
     229              : /*
     230              :  * Implementation of the infunc callback for binary format. Assign
     231              :  * the binary input function to the given *finfo.
     232              :  */
     233              : static void
     234           38 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     235              :                      FmgrInfo *finfo, Oid *typioparam)
     236              : {
     237              :     Oid         func_oid;
     238              : 
     239           38 :     getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
     240           37 :     fmgr_info(func_oid, finfo);
     241           37 : }
     242              : 
     243              : /* Implementation of the end callback for binary format */
     244              : static void
     245            4 : CopyFromBinaryEnd(CopyFromState cstate)
     246              : {
     247              :     /* nothing to do */
     248            4 : }
     249              : 
     250              : /*
     251              :  * error context callback for COPY FROM
     252              :  *
     253              :  * The argument for the error context must be CopyFromState.
     254              :  */
     255              : void
     256          244 : CopyFromErrorCallback(void *arg)
     257              : {
     258          244 :     CopyFromState cstate = (CopyFromState) arg;
     259              : 
     260          244 :     if (cstate->relname_only)
     261              :     {
     262           45 :         errcontext("COPY %s",
     263              :                    cstate->cur_relname);
     264           45 :         return;
     265              :     }
     266          199 :     if (cstate->opts.format == COPY_FORMAT_BINARY)
     267              :     {
     268              :         /* can't usefully display the data */
     269            1 :         if (cstate->cur_attname)
     270            1 :             errcontext("COPY %s, line %" PRIu64 ", column %s",
     271              :                        cstate->cur_relname,
     272              :                        cstate->cur_lineno,
     273              :                        cstate->cur_attname);
     274              :         else
     275            0 :             errcontext("COPY %s, line %" PRIu64,
     276              :                        cstate->cur_relname,
     277              :                        cstate->cur_lineno);
     278              :     }
     279              :     else
     280              :     {
     281          198 :         if (cstate->cur_attname && cstate->cur_attval)
     282           34 :         {
     283              :             /* error is relevant to a particular column */
     284              :             char       *attval;
     285              : 
     286           34 :             attval = CopyLimitPrintoutLength(cstate->cur_attval);
     287           34 :             errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
     288              :                        cstate->cur_relname,
     289              :                        cstate->cur_lineno,
     290              :                        cstate->cur_attname,
     291              :                        attval);
     292           34 :             pfree(attval);
     293              :         }
     294          164 :         else if (cstate->cur_attname)
     295              :         {
     296              :             /* error is relevant to a particular column, value is NULL */
     297            8 :             errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
     298              :                        cstate->cur_relname,
     299              :                        cstate->cur_lineno,
     300              :                        cstate->cur_attname);
     301              :         }
     302              :         else
     303              :         {
     304              :             /*
     305              :              * Error is relevant to a particular line.
     306              :              *
     307              :              * If line_buf still contains the correct line, print it.
     308              :              */
     309          156 :             if (cstate->line_buf_valid)
     310              :             {
     311              :                 char       *lineval;
     312              : 
     313          129 :                 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
     314          129 :                 errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
     315              :                            cstate->cur_relname,
     316              :                            cstate->cur_lineno, lineval);
     317          129 :                 pfree(lineval);
     318              :             }
     319              :             else
     320              :             {
     321           27 :                 errcontext("COPY %s, line %" PRIu64,
     322              :                            cstate->cur_relname,
     323              :                            cstate->cur_lineno);
     324              :             }
     325              :         }
     326              :     }
     327              : }
     328              : 
     329              : /*
     330              :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     331              :  *
     332              :  * Returns a pstrdup'd copy of the input.
     333              :  */
     334              : char *
     335          203 : CopyLimitPrintoutLength(const char *str)
     336              : {
     337              : #define MAX_COPY_DATA_DISPLAY 100
     338              : 
     339          203 :     int         slen = strlen(str);
     340              :     int         len;
     341              :     char       *res;
     342              : 
     343              :     /* Fast path if definitely okay */
     344          203 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     345          203 :         return pstrdup(str);
     346              : 
     347              :     /* Apply encoding-dependent truncation */
     348            0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     349              : 
     350              :     /*
     351              :      * Truncate, and add "..." to show we truncated the input.
     352              :      */
     353            0 :     res = (char *) palloc(len + 4);
     354            0 :     memcpy(res, str, len);
     355            0 :     strcpy(res + len, "...");
     356              : 
     357            0 :     return res;
     358              : }
     359              : 
     360              : /*
     361              :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     362              :  * ResultRelInfo.
     363              :  */
     364              : static CopyMultiInsertBuffer *
     365         1024 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     366              : {
     367              :     CopyMultiInsertBuffer *buffer;
     368              : 
     369         1024 :     buffer = palloc_object(CopyMultiInsertBuffer);
     370         1024 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     371         1024 :     buffer->resultRelInfo = rri;
     372         1024 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     373         1024 :     buffer->nused = 0;
     374              : 
     375         1024 :     return buffer;
     376              : }
     377              : 
     378              : /*
     379              :  * Make a new buffer for this ResultRelInfo.
     380              :  */
     381              : static inline void
     382         1024 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     383              :                                ResultRelInfo *rri)
     384              : {
     385              :     CopyMultiInsertBuffer *buffer;
     386              : 
     387         1024 :     buffer = CopyMultiInsertBufferInit(rri);
     388              : 
     389              :     /* Setup back-link so we can easily find this buffer again */
     390         1024 :     rri->ri_CopyMultiInsertBuffer = buffer;
     391              :     /* Record that we're tracking this buffer */
     392         1024 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     393         1024 : }
     394              : 
     395              : /*
     396              :  * Initialize an already allocated CopyMultiInsertInfo.
     397              :  *
     398              :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     399              :  * for that table.
     400              :  */
     401              : static void
     402         1020 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     403              :                         CopyFromState cstate, EState *estate, CommandId mycid,
     404              :                         uint32 ti_options)
     405              : {
     406         1020 :     miinfo->multiInsertBuffers = NIL;
     407         1020 :     miinfo->bufferedTuples = 0;
     408         1020 :     miinfo->bufferedBytes = 0;
     409         1020 :     miinfo->cstate = cstate;
     410         1020 :     miinfo->estate = estate;
     411         1020 :     miinfo->mycid = mycid;
     412         1020 :     miinfo->ti_options = ti_options;
     413              : 
     414              :     /*
     415              :      * Only setup the buffer when not dealing with a partitioned table.
     416              :      * Buffers for partitioned tables will just be setup when we need to send
     417              :      * tuples their way for the first time.
     418              :      */
     419         1020 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     420          975 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     421         1020 : }
     422              : 
     423              : /*
     424              :  * Returns true if the buffers are full
     425              :  */
     426              : static inline bool
     427       715150 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     428              : {
     429       715150 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     430       714567 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     431          653 :         return true;
     432       714497 :     return false;
     433              : }
     434              : 
     435              : /*
     436              :  * Returns true if we have no buffered tuples
     437              :  */
     438              : static inline bool
     439          906 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     440              : {
     441          906 :     return miinfo->bufferedTuples == 0;
     442              : }
     443              : 
     444              : /*
     445              :  * Write the tuples stored in 'buffer' out to the table.
     446              :  */
     447              : static inline void
     448         1474 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     449              :                            CopyMultiInsertBuffer *buffer,
     450              :                            int64 *processed)
     451              : {
     452         1474 :     CopyFromState cstate = miinfo->cstate;
     453         1474 :     EState     *estate = miinfo->estate;
     454         1474 :     int         nused = buffer->nused;
     455         1474 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     456         1474 :     TupleTableSlot **slots = buffer->slots;
     457              :     int         i;
     458              : 
     459         1474 :     if (resultRelInfo->ri_FdwRoutine)
     460              :     {
     461            7 :         int         batch_size = resultRelInfo->ri_BatchSize;
     462            7 :         int         sent = 0;
     463              : 
     464              :         Assert(buffer->bistate == NULL);
     465              : 
     466              :         /* Ensure that the FDW supports batching and it's enabled */
     467              :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     468              :         Assert(batch_size > 1);
     469              : 
     470              :         /*
     471              :          * We suppress error context information other than the relation name,
     472              :          * if one of the operations below fails.
     473              :          */
     474              :         Assert(!cstate->relname_only);
     475            7 :         cstate->relname_only = true;
     476              : 
     477           19 :         while (sent < nused)
     478              :         {
     479           13 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     480           13 :             int         inserted = size;
     481              :             TupleTableSlot **rslots;
     482              : 
     483              :             /* insert into foreign table: let the FDW do it */
     484              :             rslots =
     485           13 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     486              :                                                                      resultRelInfo,
     487           13 :                                                                      &slots[sent],
     488              :                                                                      NULL,
     489              :                                                                      &inserted);
     490              : 
     491           12 :             sent += size;
     492              : 
     493              :             /* No need to do anything if there are no inserted rows */
     494           12 :             if (inserted <= 0)
     495            2 :                 continue;
     496              : 
     497              :             /* Triggers on foreign tables should not have transition tables */
     498              :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     499              :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     500              : 
     501              :             /* Run AFTER ROW INSERT triggers */
     502           10 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     503            0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     504              :             {
     505            0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     506              : 
     507            0 :                 for (i = 0; i < inserted; i++)
     508              :                 {
     509            0 :                     TupleTableSlot *slot = rslots[i];
     510              : 
     511              :                     /*
     512              :                      * AFTER ROW Triggers might reference the tableoid column,
     513              :                      * so (re-)initialize tts_tableOid before evaluating them.
     514              :                      */
     515            0 :                     slot->tts_tableOid = relid;
     516              : 
     517            0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     518              :                                          slot, NIL,
     519              :                                          cstate->transition_capture);
     520              :                 }
     521              :             }
     522              : 
     523              :             /* Update the row counter and progress of the COPY command */
     524           10 :             *processed += inserted;
     525           10 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     526              :                                          *processed);
     527              :         }
     528              : 
     529           24 :         for (i = 0; i < nused; i++)
     530           18 :             ExecClearTuple(slots[i]);
     531              : 
     532              :         /* reset relname_only */
     533            6 :         cstate->relname_only = false;
     534              :     }
     535              :     else
     536              :     {
     537         1467 :         CommandId   mycid = miinfo->mycid;
     538         1467 :         uint32      ti_options = miinfo->ti_options;
     539         1467 :         bool        line_buf_valid = cstate->line_buf_valid;
     540         1467 :         uint64      save_cur_lineno = cstate->cur_lineno;
     541              :         MemoryContext oldcontext;
     542              : 
     543              :         Assert(buffer->bistate != NULL);
     544              : 
     545              :         /*
     546              :          * Print error context information correctly, if one of the operations
     547              :          * below fails.
     548              :          */
     549         1467 :         cstate->line_buf_valid = false;
     550              : 
     551              :         /*
     552              :          * table_multi_insert may leak memory, so switch to short-lived memory
     553              :          * context before calling it.
     554              :          */
     555         1467 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     556         1467 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     557              :                            slots,
     558              :                            nused,
     559              :                            mycid,
     560              :                            ti_options,
     561         1467 :                            buffer->bistate);
     562         1467 :         MemoryContextSwitchTo(oldcontext);
     563              : 
     564       716514 :         for (i = 0; i < nused; i++)
     565              :         {
     566              :             /*
     567              :              * If there are any indexes, update them for all the inserted
     568              :              * tuples, and run AFTER ROW INSERT triggers.
     569              :              */
     570       715056 :             if (resultRelInfo->ri_NumIndices > 0)
     571              :             {
     572              :                 List       *recheckIndexes;
     573              : 
     574       100993 :                 cstate->cur_lineno = buffer->linenos[i];
     575              :                 recheckIndexes =
     576       100993 :                     ExecInsertIndexTuples(resultRelInfo,
     577              :                                           estate, 0, buffer->slots[i],
     578              :                                           NIL, NULL);
     579       100984 :                 ExecARInsertTriggers(estate, resultRelInfo,
     580       100984 :                                      slots[i], recheckIndexes,
     581              :                                      cstate->transition_capture);
     582       100984 :                 list_free(recheckIndexes);
     583              :             }
     584              : 
     585              :             /*
     586              :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     587              :              * triggers anyway.
     588              :              */
     589       614063 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     590           56 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     591           44 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     592              :             {
     593           24 :                 cstate->cur_lineno = buffer->linenos[i];
     594           24 :                 ExecARInsertTriggers(estate, resultRelInfo,
     595           24 :                                      slots[i], NIL,
     596              :                                      cstate->transition_capture);
     597              :             }
     598              : 
     599       715047 :             ExecClearTuple(slots[i]);
     600              :         }
     601              : 
     602              :         /* Update the row counter and progress of the COPY command */
     603         1458 :         *processed += nused;
     604         1458 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     605              :                                      *processed);
     606              : 
     607              :         /* reset cur_lineno and line_buf_valid to what they were */
     608         1458 :         cstate->line_buf_valid = line_buf_valid;
     609         1458 :         cstate->cur_lineno = save_cur_lineno;
     610              :     }
     611              : 
     612              :     /* Mark that all slots are free */
     613         1464 :     buffer->nused = 0;
     614         1464 : }
     615              : 
     616              : /*
     617              :  * Drop used slots and free member for this buffer.
     618              :  *
     619              :  * The buffer must be flushed before cleanup.
     620              :  */
     621              : static inline void
     622          882 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     623              :                              CopyMultiInsertBuffer *buffer)
     624              : {
     625          882 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     626              :     int         i;
     627              : 
     628              :     /* Ensure buffer was flushed */
     629              :     Assert(buffer->nused == 0);
     630              : 
     631              :     /* Remove back-link to ourself */
     632          882 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     633              : 
     634          882 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     635              :     {
     636              :         Assert(buffer->bistate != NULL);
     637          876 :         FreeBulkInsertState(buffer->bistate);
     638              :     }
     639              :     else
     640              :         Assert(buffer->bistate == NULL);
     641              : 
     642              :     /* Since we only create slots on demand, just drop the non-null ones. */
     643       150807 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     644       149925 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     645              : 
     646          882 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     647          876 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     648              :                                  miinfo->ti_options);
     649              : 
     650          882 :     pfree(buffer);
     651          882 : }
     652              : 
     653              : /*
     654              :  * Write out all stored tuples in all buffers out to the tables.
     655              :  *
     656              :  * Once flushed we also trim the tracked buffers list down to size by removing
     657              :  * the buffers created earliest first.
     658              :  *
     659              :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     660              :  * used.  When cleaning up old buffers we'll never remove the one for
     661              :  * 'curr_rri'.
     662              :  */
     663              : static inline void
     664         1359 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     665              :                          int64 *processed)
     666              : {
     667              :     ListCell   *lc;
     668              : 
     669         2823 :     foreach(lc, miinfo->multiInsertBuffers)
     670              :     {
     671         1474 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     672              : 
     673         1474 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     674              :     }
     675              : 
     676         1349 :     miinfo->bufferedTuples = 0;
     677         1349 :     miinfo->bufferedBytes = 0;
     678              : 
     679              :     /*
     680              :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     681              :      * remove buffers starting with the ones we created first.  It seems less
     682              :      * likely that these older ones will be needed than the ones that were
     683              :      * just created.
     684              :      */
     685         1349 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     686              :     {
     687              :         CopyMultiInsertBuffer *buffer;
     688              : 
     689            0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     690              : 
     691              :         /*
     692              :          * We never want to remove the buffer that's currently being used, so
     693              :          * if we happen to find that then move it to the end of the list.
     694              :          */
     695            0 :         if (buffer->resultRelInfo == curr_rri)
     696              :         {
     697              :             /*
     698              :              * The code below would misbehave if we were trying to reduce the
     699              :              * list to less than two items.
     700              :              */
     701              :             StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
     702              :                              "MAX_PARTITION_BUFFERS must be >= 2");
     703              : 
     704            0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     705            0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     706            0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     707              :         }
     708              : 
     709            0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     710            0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     711              :     }
     712         1349 : }
     713              : 
     714              : /*
     715              :  * Cleanup allocated buffers and free memory
     716              :  */
     717              : static inline void
     718          878 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     719              : {
     720              :     ListCell   *lc;
     721              : 
     722         1760 :     foreach(lc, miinfo->multiInsertBuffers)
     723          882 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     724              : 
     725          878 :     list_free(miinfo->multiInsertBuffers);
     726          878 : }
     727              : 
     728              : /*
     729              :  * Get the next TupleTableSlot that the next tuple should be stored in.
     730              :  *
     731              :  * Callers must ensure that the buffer is not full.
     732              :  *
     733              :  * Note: 'miinfo' is unused but has been included for consistency with the
     734              :  * other functions in this area.
     735              :  */
     736              : static inline TupleTableSlot *
     737       716197 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     738              :                                 ResultRelInfo *rri)
     739              : {
     740       716197 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     741              :     int         nused;
     742              : 
     743              :     Assert(buffer != NULL);
     744              :     Assert(buffer->nused < MAX_BUFFERED_TUPLES);
     745              : 
     746       716197 :     nused = buffer->nused;
     747              : 
     748       716197 :     if (buffer->slots[nused] == NULL)
     749       150151 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     750       716197 :     return buffer->slots[nused];
     751              : }
     752              : 
     753              : /*
     754              :  * Record the previously reserved TupleTableSlot that was reserved by
     755              :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     756              :  */
     757              : static inline void
     758       715150 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     759              :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     760              : {
     761       715150 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     762              : 
     763              :     Assert(buffer != NULL);
     764              :     Assert(slot == buffer->slots[buffer->nused]);
     765              : 
     766              :     /* Store the line number so we can properly report any errors later */
     767       715150 :     buffer->linenos[buffer->nused] = lineno;
     768              : 
     769              :     /* Record this slot as being used */
     770       715150 :     buffer->nused++;
     771              : 
     772              :     /* Update how many tuples are stored and their size */
     773       715150 :     miinfo->bufferedTuples++;
     774       715150 :     miinfo->bufferedBytes += tuplen;
     775       715150 : }
     776              : 
     777              : /*
     778              :  * Copy FROM file to relation.
     779              :  */
     780              : uint64
     781         1140 : CopyFrom(CopyFromState cstate)
     782              : {
     783              :     ResultRelInfo *resultRelInfo;
     784              :     ResultRelInfo *target_resultRelInfo;
     785         1140 :     ResultRelInfo *prevResultRelInfo = NULL;
     786         1140 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     787              :     ModifyTableState *mtstate;
     788              :     ExprContext *econtext;
     789         1140 :     TupleTableSlot *singleslot = NULL;
     790         1140 :     MemoryContext oldcontext = CurrentMemoryContext;
     791              : 
     792         1140 :     PartitionTupleRouting *proute = NULL;
     793              :     ErrorContextCallback errcallback;
     794         1140 :     CommandId   mycid = GetCurrentCommandId(true);
     795         1140 :     uint32      ti_options = 0; /* start with default options for insert */
     796         1140 :     BulkInsertState bistate = NULL;
     797              :     CopyInsertMethod insertMethod;
     798         1140 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     799         1140 :     int64       processed = 0;
     800         1140 :     int64       excluded = 0;
     801              :     bool        has_before_insert_row_trig;
     802              :     bool        has_instead_insert_row_trig;
     803         1140 :     bool        leafpart_use_multi_insert = false;
     804              : 
     805              :     Assert(cstate->rel);
     806              :     Assert(list_length(cstate->range_table) == 1);
     807              : 
     808         1140 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
     809              :         Assert(cstate->escontext);
     810              : 
     811              :     /*
     812              :      * The target must be a plain, foreign, or partitioned relation, or have
     813              :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     814              :      * allowed on views, so we only hint about them in the view case.)
     815              :      */
     816         1140 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     817          102 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     818           79 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     819           12 :         !(cstate->rel->trigdesc &&
     820            8 :           cstate->rel->trigdesc->trig_insert_instead_row))
     821              :     {
     822            4 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     823            4 :             ereport(ERROR,
     824              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     825              :                      errmsg("cannot copy to view \"%s\"",
     826              :                             RelationGetRelationName(cstate->rel)),
     827              :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     828            0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     829            0 :             ereport(ERROR,
     830              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     831              :                      errmsg("cannot copy to materialized view \"%s\"",
     832              :                             RelationGetRelationName(cstate->rel))));
     833            0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     834            0 :             ereport(ERROR,
     835              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     836              :                      errmsg("cannot copy to sequence \"%s\"",
     837              :                             RelationGetRelationName(cstate->rel))));
     838              :         else
     839            0 :             ereport(ERROR,
     840              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     841              :                      errmsg("cannot copy to non-table relation \"%s\"",
     842              :                             RelationGetRelationName(cstate->rel))));
     843              :     }
     844              : 
     845              :     /*
     846              :      * If the target file is new-in-transaction, we assume that checking FSM
     847              :      * for free space is a waste of time.  This could possibly be wrong, but
     848              :      * it's unlikely.
     849              :      */
     850         1136 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     851         1038 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     852         1029 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     853           54 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     854              : 
     855              :     /*
     856              :      * Optimize if new relation storage was created in this subxact or one of
     857              :      * its committed children and we won't see those rows later as part of an
     858              :      * earlier scan or command. The subxact test ensures that if this subxact
     859              :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     860              :      * that the stronger test of exactly which subtransaction created it is
     861              :      * crucial for correctness of this optimization. The test for an earlier
     862              :      * scan or command tolerates false negatives. FREEZE causes other sessions
     863              :      * to see rows they would not see under MVCC, and a false negative merely
     864              :      * spreads that anomaly to the current session.
     865              :      */
     866         1136 :     if (cstate->opts.freeze)
     867              :     {
     868              :         /*
     869              :          * We currently disallow COPY FREEZE on partitioned tables.  The
     870              :          * reason for this is that we've simply not yet opened the partitions
     871              :          * to determine if the optimization can be applied to them.  We could
     872              :          * go and open them all here, but doing so may be quite a costly
     873              :          * overhead for small copies.  In any case, we may just end up routing
     874              :          * tuples to a small number of partitions.  It seems better just to
     875              :          * raise an ERROR for partitioned tables.
     876              :          */
     877           45 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     878              :         {
     879            4 :             ereport(ERROR,
     880              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     881              :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     882              :         }
     883              : 
     884              :         /* There's currently no support for COPY FREEZE on foreign tables. */
     885           41 :         if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
     886            4 :             ereport(ERROR,
     887              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     888              :                      errmsg("cannot perform COPY FREEZE on a foreign table")));
     889              : 
     890              :         /*
     891              :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     892              :          * Scan-bearing queries generally create at least two registrations,
     893              :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     894              :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     895              :          * still detect ongoing catalog scans, each of which separately
     896              :          * registers the snapshot it uses.
     897              :          */
     898           37 :         InvalidateCatalogSnapshot();
     899           37 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     900            0 :             ereport(ERROR,
     901              :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     902              :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     903              : 
     904           74 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     905           37 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     906           12 :             ereport(ERROR,
     907              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     908              :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     909              : 
     910           25 :         ti_options |= TABLE_INSERT_FROZEN;
     911              :     }
     912              : 
     913              :     /*
     914              :      * We need a ResultRelInfo so we can use the regular executor's
     915              :      * index-entry-making machinery.  (There used to be a huge amount of code
     916              :      * here that basically duplicated execUtils.c ...)
     917              :      */
     918         1116 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
     919              :                        bms_make_singleton(1));
     920         1116 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     921         1116 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     922              : 
     923              :     /* Verify the named relation is a valid target for INSERT */
     924         1116 :     CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
     925              : 
     926         1115 :     ExecOpenIndices(resultRelInfo, false);
     927              : 
     928              :     /*
     929              :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     930              :      * foreign-table result relation(s).
     931              :      */
     932         1115 :     mtstate = makeNode(ModifyTableState);
     933         1115 :     mtstate->ps.plan = NULL;
     934         1115 :     mtstate->ps.state = estate;
     935         1115 :     mtstate->operation = CMD_INSERT;
     936         1115 :     mtstate->mt_nrels = 1;
     937         1115 :     mtstate->resultRelInfo = resultRelInfo;
     938         1115 :     mtstate->rootResultRelInfo = resultRelInfo;
     939              : 
     940         1115 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     941           18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     942           18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     943              :                                                          resultRelInfo);
     944              : 
     945              :     /*
     946              :      * Also, if the named relation is a foreign table, determine if the FDW
     947              :      * supports batch insert and determine the batch size (a FDW may support
     948              :      * batching, but it may be disabled for the server/table).
     949              :      *
     950              :      * If the FDW does not support batching, we set the batch size to 1.
     951              :      */
     952         1115 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     953           18 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     954           18 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     955           18 :         resultRelInfo->ri_BatchSize =
     956           18 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     957              :     else
     958         1097 :         resultRelInfo->ri_BatchSize = 1;
     959              : 
     960              :     Assert(resultRelInfo->ri_BatchSize >= 1);
     961              : 
     962              :     /* Prepare to catch AFTER triggers. */
     963         1115 :     AfterTriggerBeginQuery();
     964              : 
     965              :     /*
     966              :      * If there are any triggers with transition tables on the named relation,
     967              :      * we need to be prepared to capture transition tuples.
     968              :      *
     969              :      * Because partition tuple routing would like to know about whether
     970              :      * transition capture is active, we also set it in mtstate, which is
     971              :      * passed to ExecFindPartition() below.
     972              :      */
     973         1115 :     cstate->transition_capture = mtstate->mt_transition_capture =
     974         1115 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     975         1115 :                                    RelationGetRelid(cstate->rel),
     976              :                                    CMD_INSERT);
     977              : 
     978              :     /*
     979              :      * If the named relation is a partitioned table, initialize state for
     980              :      * CopyFrom tuple routing.
     981              :      */
     982         1115 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     983           63 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     984              : 
     985         1115 :     if (cstate->whereClause)
     986           12 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     987              :                                         &mtstate->ps);
     988              : 
     989              :     /*
     990              :      * It's generally more efficient to prepare a bunch of tuples for
     991              :      * insertion, and insert them in one
     992              :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     993              :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     994              :      * However, there are a number of reasons why we might not be able to do
     995              :      * this.  These are explained below.
     996              :      */
     997         1115 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     998          118 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     999           58 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
    1000              :     {
    1001              :         /*
    1002              :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
    1003              :          * triggers on the table. Such triggers might query the table we're
    1004              :          * inserting into and act differently if the tuples that have already
    1005              :          * been processed and prepared for insertion are not there.
    1006              :          */
    1007           68 :         insertMethod = CIM_SINGLE;
    1008              :     }
    1009         1047 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
    1010           14 :              resultRelInfo->ri_BatchSize == 1)
    1011              :     {
    1012              :         /*
    1013              :          * Can't support multi-inserts to a foreign table if the FDW does not
    1014              :          * support batching, or it's disabled for the server or foreign table.
    1015              :          */
    1016            9 :         insertMethod = CIM_SINGLE;
    1017              :     }
    1018         1038 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
    1019           19 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
    1020              :     {
    1021              :         /*
    1022              :          * For partitioned tables we can't support multi-inserts when there
    1023              :          * are any statement level insert triggers. It might be possible to
    1024              :          * allow partitioned tables with such triggers in the future, but for
    1025              :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
    1026              :          * statement level insert triggers are on the same relation.
    1027              :          */
    1028           14 :         insertMethod = CIM_SINGLE;
    1029              :     }
    1030         1024 :     else if (cstate->volatile_defexprs)
    1031              :     {
    1032              :         /*
    1033              :          * Can't support multi-inserts if there are any volatile default
    1034              :          * expressions in the table.  Similarly to the trigger case above,
    1035              :          * such expressions may query the table we're inserting into.
    1036              :          *
    1037              :          * Note: It does not matter if any partitions have any volatile
    1038              :          * default expressions as we use the defaults from the target of the
    1039              :          * COPY command.
    1040              :          */
    1041            4 :         insertMethod = CIM_SINGLE;
    1042              :     }
    1043         1020 :     else if (contain_volatile_functions(cstate->whereClause))
    1044              :     {
    1045              :         /*
    1046              :          * Can't support multi-inserts if there are any volatile function
    1047              :          * expressions in WHERE clause.  Similarly to the trigger case above,
    1048              :          * such expressions may query the table we're inserting into.
    1049              :          *
    1050              :          * Note: the whereClause was already preprocessed in DoCopy(), so it's
    1051              :          * okay to use contain_volatile_functions() directly.
    1052              :          */
    1053            0 :         insertMethod = CIM_SINGLE;
    1054              :     }
    1055              :     else
    1056              :     {
    1057              :         /*
    1058              :          * For partitioned tables, we may still be able to perform bulk
    1059              :          * inserts.  However, the possibility of this depends on which types
    1060              :          * of triggers exist on the partition.  We must disable bulk inserts
    1061              :          * if the partition is a foreign table that can't use batching or it
    1062              :          * has any before row insert or insert instead triggers (same as we
    1063              :          * checked above for the parent table).  Since the partition's
    1064              :          * resultRelInfos are initialized only when we actually need to insert
    1065              :          * the first tuple into them, we must have the intermediate insert
    1066              :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
    1067              :          * determine if we can use bulk-inserts for the partition being
    1068              :          * inserted into.
    1069              :          */
    1070         1020 :         if (proute)
    1071           45 :             insertMethod = CIM_MULTI_CONDITIONAL;
    1072              :         else
    1073          975 :             insertMethod = CIM_MULTI;
    1074              : 
    1075         1020 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
    1076              :                                 estate, mycid, ti_options);
    1077              :     }
    1078              : 
    1079              :     /*
    1080              :      * If not using batch mode (which allocates slots as needed) set up a
    1081              :      * tuple slot too. When inserting into a partitioned table, we also need
    1082              :      * one, even if we might batch insert, to read the tuple in the root
    1083              :      * partition's form.
    1084              :      */
    1085         1115 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
    1086              :     {
    1087          140 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
    1088              :                                        &estate->es_tupleTable);
    1089          140 :         bistate = GetBulkInsertState();
    1090              :     }
    1091              : 
    1092         1233 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1093          118 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1094              : 
    1095         1233 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1096          118 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1097              : 
    1098              :     /*
    1099              :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
    1100              :      * should do this for COPY, since it's not really an "INSERT" statement as
    1101              :      * such. However, executing these triggers maintains consistency with the
    1102              :      * EACH ROW triggers that we already fire on COPY.
    1103              :      */
    1104         1115 :     ExecBSInsertTriggers(estate, resultRelInfo);
    1105              : 
    1106         1115 :     econtext = GetPerTupleExprContext(estate);
    1107              : 
    1108              :     /* Set up callback to identify error line number */
    1109         1115 :     errcallback.callback = CopyFromErrorCallback;
    1110         1115 :     errcallback.arg = cstate;
    1111         1115 :     errcallback.previous = error_context_stack;
    1112         1115 :     error_context_stack = &errcallback;
    1113              : 
    1114              :     for (;;)
    1115       755480 :     {
    1116              :         TupleTableSlot *myslot;
    1117              :         bool        skip_tuple;
    1118              : 
    1119       756595 :         CHECK_FOR_INTERRUPTS();
    1120              : 
    1121              :         /*
    1122              :          * Reset the per-tuple exprcontext. We do this after every tuple, to
    1123              :          * clean-up after expression evaluations etc.
    1124              :          */
    1125       756595 :         ResetPerTupleExprContext(estate);
    1126              : 
    1127              :         /* select slot to (initially) load row into */
    1128       756595 :         if (insertMethod == CIM_SINGLE || proute)
    1129              :         {
    1130       149557 :             myslot = singleslot;
    1131       149557 :             Assert(myslot != NULL);
    1132              :         }
    1133              :         else
    1134              :         {
    1135              :             Assert(resultRelInfo == target_resultRelInfo);
    1136              :             Assert(insertMethod == CIM_MULTI);
    1137              : 
    1138       607038 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1139              :                                                      resultRelInfo);
    1140              :         }
    1141              : 
    1142              :         /*
    1143              :          * Switch to per-tuple context before calling NextCopyFrom, which does
    1144              :          * evaluate default expressions etc. and requires per-tuple context.
    1145              :          */
    1146       756595 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1147              : 
    1148       756595 :         ExecClearTuple(myslot);
    1149              : 
    1150              :         /* Directly store the values/nulls array in the slot */
    1151       756595 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
    1152          964 :             break;
    1153              : 
    1154       755508 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
    1155          108 :             cstate->escontext->error_occurred)
    1156              :         {
    1157              :             /*
    1158              :              * Soft error occurred, skip this tuple and just make
    1159              :              * ErrorSaveContext ready for the next NextCopyFrom. Since we
    1160              :              * don't set details_wanted and error_data is not to be filled,
    1161              :              * just resetting error_occurred is enough.
    1162              :              */
    1163           72 :             cstate->escontext->error_occurred = false;
    1164              : 
    1165              :             /* Report that this tuple was skipped by the ON_ERROR clause */
    1166           72 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
    1167           72 :                                          cstate->num_errors);
    1168              : 
    1169           72 :             if (cstate->opts.reject_limit > 0 &&
    1170           32 :                 cstate->num_errors > cstate->opts.reject_limit)
    1171            4 :                 ereport(ERROR,
    1172              :                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
    1173              :                          errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
    1174              :                                 cstate->opts.reject_limit)));
    1175              : 
    1176              :             /* Repeat NextCopyFrom() until no soft error occurs */
    1177           68 :             continue;
    1178              :         }
    1179              : 
    1180       755436 :         ExecStoreVirtualTuple(myslot);
    1181              : 
    1182              :         /*
    1183              :          * Constraints and where clause might reference the tableoid column,
    1184              :          * so (re-)initialize tts_tableOid before evaluating them.
    1185              :          */
    1186       755436 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
    1187              : 
    1188              :         /* Triggers and stuff need to be invoked in query context. */
    1189       755436 :         MemoryContextSwitchTo(oldcontext);
    1190              : 
    1191       755436 :         if (cstate->whereClause)
    1192              :         {
    1193           44 :             econtext->ecxt_scantuple = myslot;
    1194              :             /* Skip items that don't match COPY's WHERE clause */
    1195           44 :             if (!ExecQual(cstate->qualexpr, econtext))
    1196              :             {
    1197              :                 /*
    1198              :                  * Report that this tuple was filtered out by the WHERE
    1199              :                  * clause.
    1200              :                  */
    1201           24 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1202              :                                              ++excluded);
    1203           24 :                 continue;
    1204              :             }
    1205              :         }
    1206              : 
    1207              :         /* Determine the partition to insert the tuple into */
    1208       755412 :         if (proute)
    1209              :         {
    1210              :             TupleConversionMap *map;
    1211              : 
    1212              :             /*
    1213              :              * Attempt to find a partition suitable for this tuple.
    1214              :              * ExecFindPartition() will raise an error if none can be found or
    1215              :              * if the found partition is not suitable for INSERTs.
    1216              :              */
    1217       149252 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1218              :                                               proute, myslot, estate);
    1219              : 
    1220       149251 :             if (prevResultRelInfo != resultRelInfo)
    1221              :             {
    1222              :                 /* Determine which triggers exist on this partition */
    1223        90815 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1224           36 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1225              : 
    1226        90815 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1227           36 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1228              : 
    1229              :                 /*
    1230              :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1231              :                  * OF triggers, or if the partition is a foreign table that
    1232              :                  * can't use batching.
    1233              :                  */
    1234       141520 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1235        50741 :                     !has_before_insert_row_trig &&
    1236       192245 :                     !has_instead_insert_row_trig &&
    1237        50725 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1238            6 :                      resultRelInfo->ri_BatchSize > 1);
    1239              : 
    1240              :                 /* Set the multi-insert buffer to use for this partition. */
    1241        90779 :                 if (leafpart_use_multi_insert)
    1242              :                 {
    1243        50723 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1244           49 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1245              :                                                        resultRelInfo);
    1246              :                 }
    1247        40056 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1248           18 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1249              :                 {
    1250              :                     /*
    1251              :                      * Flush pending inserts if this partition can't use
    1252              :                      * batching, so rows are visible to triggers etc.
    1253              :                      */
    1254            0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1255              :                                              resultRelInfo,
    1256              :                                              &processed);
    1257              :                 }
    1258              : 
    1259        90779 :                 if (bistate != NULL)
    1260        90779 :                     ReleaseBulkInsertStatePin(bistate);
    1261        90779 :                 prevResultRelInfo = resultRelInfo;
    1262              :             }
    1263              : 
    1264              :             /*
    1265              :              * If we're capturing transition tuples, we might need to convert
    1266              :              * from the partition rowtype to root rowtype. But if there are no
    1267              :              * BEFORE triggers on the partition that could change the tuple,
    1268              :              * we can just remember the original unconverted tuple to avoid a
    1269              :              * needless round trip conversion.
    1270              :              */
    1271       149251 :             if (cstate->transition_capture != NULL)
    1272           38 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1273           38 :                     !has_before_insert_row_trig ? myslot : NULL;
    1274              : 
    1275              :             /*
    1276              :              * We might need to convert from the root rowtype to the partition
    1277              :              * rowtype.
    1278              :              */
    1279       149251 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1280       149251 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1281              :             {
    1282              :                 /* non batch insert */
    1283        40092 :                 if (map != NULL)
    1284              :                 {
    1285              :                     TupleTableSlot *new_slot;
    1286              : 
    1287           73 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1288           73 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1289              :                 }
    1290              :             }
    1291              :             else
    1292              :             {
    1293              :                 /*
    1294              :                  * Prepare to queue up tuple for later batch insert into
    1295              :                  * current partition.
    1296              :                  */
    1297              :                 TupleTableSlot *batchslot;
    1298              : 
    1299              :                 /* no other path available for partitioned table */
    1300              :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1301              : 
    1302       109159 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1303              :                                                             resultRelInfo);
    1304              : 
    1305       109159 :                 if (map != NULL)
    1306         8132 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1307              :                                                    batchslot);
    1308              :                 else
    1309              :                 {
    1310              :                     /*
    1311              :                      * This looks more expensive than it is (Believe me, I
    1312              :                      * optimized it away. Twice.). The input is in virtual
    1313              :                      * form, and we'll materialize the slot below - for most
    1314              :                      * slot types the copy performs the work materialization
    1315              :                      * would later require anyway.
    1316              :                      */
    1317       101027 :                     ExecCopySlot(batchslot, myslot);
    1318       101027 :                     myslot = batchslot;
    1319              :                 }
    1320              :             }
    1321              : 
    1322              :             /* ensure that triggers etc see the right relation  */
    1323       149251 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1324              :         }
    1325              : 
    1326       755411 :         skip_tuple = false;
    1327              : 
    1328              :         /* BEFORE ROW INSERT Triggers */
    1329       755411 :         if (has_before_insert_row_trig)
    1330              :         {
    1331          180 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1332           10 :                 skip_tuple = true;  /* "do nothing" */
    1333              :         }
    1334              : 
    1335       755411 :         if (!skip_tuple)
    1336              :         {
    1337              :             /*
    1338              :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1339              :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1340              :              * table or foreign table.
    1341              :              */
    1342       755401 :             if (has_instead_insert_row_trig)
    1343              :             {
    1344            8 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1345              :             }
    1346              :             else
    1347              :             {
    1348              :                 /* Compute stored generated columns */
    1349       755393 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1350       241114 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1351           21 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1352              :                                                CMD_INSERT);
    1353              : 
    1354              :                 /*
    1355              :                  * If the target is a plain table, check the constraints of
    1356              :                  * the tuple.
    1357              :                  */
    1358       755393 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1359       755347 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1360       241096 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1361              : 
    1362              :                 /*
    1363              :                  * Also check the tuple against the partition constraint, if
    1364              :                  * there is one; except that if we got here via tuple-routing,
    1365              :                  * we don't need to if there's no BR trigger defined on the
    1366              :                  * partition.
    1367              :                  */
    1368       755373 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1369       149243 :                     (proute == NULL || has_before_insert_row_trig))
    1370         1167 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1371              : 
    1372              :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1373       755373 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1374              :                 {
    1375              :                     /*
    1376              :                      * The slot previously might point into the per-tuple
    1377              :                      * context. For batching it needs to be longer lived.
    1378              :                      */
    1379       715150 :                     ExecMaterializeSlot(myslot);
    1380              : 
    1381              :                     /* Add this tuple to the tuple buffer */
    1382       715150 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1383              :                                              resultRelInfo, myslot,
    1384              :                                              cstate->line_buf.len,
    1385              :                                              cstate->cur_lineno);
    1386              : 
    1387              :                     /*
    1388              :                      * If enough inserts have queued up, then flush all
    1389              :                      * buffers out to their tables.
    1390              :                      */
    1391       715150 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1392          653 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1393              :                                                  resultRelInfo,
    1394              :                                                  &processed);
    1395              : 
    1396              :                     /*
    1397              :                      * We delay updating the row counter and progress of the
    1398              :                      * COPY command until after writing the tuples stored in
    1399              :                      * the buffer out to the table, as in single insert mode.
    1400              :                      * See CopyMultiInsertBufferFlush().
    1401              :                      */
    1402       715150 :                     continue;   /* next tuple please */
    1403              :                 }
    1404              :                 else
    1405              :                 {
    1406        40223 :                     List       *recheckIndexes = NIL;
    1407              : 
    1408              :                     /* OK, store the tuple */
    1409        40223 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1410              :                     {
    1411           27 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1412              :                                                                                  resultRelInfo,
    1413              :                                                                                  myslot,
    1414              :                                                                                  NULL);
    1415              : 
    1416           26 :                         if (myslot == NULL) /* "do nothing" */
    1417            2 :                             continue;   /* next tuple please */
    1418              : 
    1419              :                         /*
    1420              :                          * AFTER ROW Triggers might reference the tableoid
    1421              :                          * column, so (re-)initialize tts_tableOid before
    1422              :                          * evaluating them.
    1423              :                          */
    1424           24 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1425              :                     }
    1426              :                     else
    1427              :                     {
    1428              :                         /* OK, store the tuple and create index entries for it */
    1429        40196 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1430              :                                            myslot, mycid, ti_options, bistate);
    1431              : 
    1432        40196 :                         if (resultRelInfo->ri_NumIndices > 0)
    1433            0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1434              :                                                                    estate, 0,
    1435              :                                                                    myslot, NIL,
    1436              :                                                                    NULL);
    1437              :                     }
    1438              : 
    1439              :                     /* AFTER ROW INSERT Triggers */
    1440        40220 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1441              :                                          recheckIndexes, cstate->transition_capture);
    1442              : 
    1443        40218 :                     list_free(recheckIndexes);
    1444              :                 }
    1445              :             }
    1446              : 
    1447              :             /*
    1448              :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1449              :              * or FDW; this is the same definition used by nodeModifyTable.c
    1450              :              * for counting tuples inserted by an INSERT command.  Update
    1451              :              * progress of the COPY command as well.
    1452              :              */
    1453        40226 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1454              :                                          ++processed);
    1455              :         }
    1456              :     }
    1457              : 
    1458              :     /* Flush any remaining buffered tuples */
    1459          964 :     if (insertMethod != CIM_SINGLE)
    1460              :     {
    1461          888 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1462          706 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1463              :     }
    1464              : 
    1465              :     /* Done, clean up */
    1466          954 :     error_context_stack = errcallback.previous;
    1467              : 
    1468          954 :     if (cstate->num_errors > 0 &&
    1469           24 :         cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
    1470              :     {
    1471           20 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1472           16 :             ereport(NOTICE,
    1473              :                     errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
    1474              :                                   "%" PRIu64 " rows were skipped due to data type incompatibility",
    1475              :                                   cstate->num_errors,
    1476              :                                   cstate->num_errors));
    1477            4 :         else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
    1478            4 :             ereport(NOTICE,
    1479              :                     errmsg_plural("in %" PRIu64 " row, columns were set to null due to data type incompatibility",
    1480              :                                   "in %" PRIu64 " rows, columns were set to null due to data type incompatibility",
    1481              :                                   cstate->num_errors,
    1482              :                                   cstate->num_errors));
    1483              :     }
    1484              : 
    1485          954 :     if (bistate != NULL)
    1486          120 :         FreeBulkInsertState(bistate);
    1487              : 
    1488          954 :     MemoryContextSwitchTo(oldcontext);
    1489              : 
    1490              :     /* Execute AFTER STATEMENT insertion triggers */
    1491          954 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1492              : 
    1493              :     /* Handle queued AFTER triggers */
    1494          954 :     AfterTriggerEndQuery(estate);
    1495              : 
    1496          954 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1497              : 
    1498              :     /* Allow the FDW to shut down */
    1499          954 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1500           16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1501           16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1502              :                                                               target_resultRelInfo);
    1503              : 
    1504              :     /* Tear down the multi-insert buffer data */
    1505          954 :     if (insertMethod != CIM_SINGLE)
    1506          878 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1507              : 
    1508              :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1509          954 :     if (proute)
    1510           60 :         ExecCleanupTupleRouting(mtstate, proute);
    1511              : 
    1512              :     /* Close the result relations, including any trigger target relations */
    1513          954 :     ExecCloseResultRelations(estate);
    1514          954 :     ExecCloseRangeTableRelations(estate);
    1515              : 
    1516          954 :     FreeExecutorState(estate);
    1517              : 
    1518          954 :     return processed;
    1519              : }
    1520              : 
    1521              : /*
    1522              :  * Setup to read tuples from a file for COPY FROM.
    1523              :  *
    1524              :  * 'rel': Used as a template for the tuples
    1525              :  * 'whereClause': WHERE clause from the COPY FROM command
    1526              :  * 'filename': Name of server-local file to read, NULL for STDIN
    1527              :  * 'is_program': true if 'filename' is program to execute
    1528              :  * 'data_source_cb': callback that provides the input data
    1529              :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1530              :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1531              :  *
    1532              :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1533              :  */
    1534              : CopyFromState
    1535         1376 : BeginCopyFrom(ParseState *pstate,
    1536              :               Relation rel,
    1537              :               Node *whereClause,
    1538              :               const char *filename,
    1539              :               bool is_program,
    1540              :               copy_data_source_cb data_source_cb,
    1541              :               List *attnamelist,
    1542              :               List *options)
    1543              : {
    1544              :     CopyFromState cstate;
    1545         1376 :     bool        pipe = (filename == NULL);
    1546              :     TupleDesc   tupDesc;
    1547              :     AttrNumber  num_phys_attrs,
    1548              :                 num_defaults;
    1549              :     FmgrInfo   *in_functions;
    1550              :     Oid        *typioparams;
    1551              :     int        *defmap;
    1552              :     ExprState **defexprs;
    1553              :     MemoryContext oldcontext;
    1554              :     bool        volatile_defexprs;
    1555         1376 :     const int   progress_cols[] = {
    1556              :         PROGRESS_COPY_COMMAND,
    1557              :         PROGRESS_COPY_TYPE,
    1558              :         PROGRESS_COPY_BYTES_TOTAL
    1559              :     };
    1560         1376 :     int64       progress_vals[] = {
    1561              :         PROGRESS_COPY_COMMAND_FROM,
    1562              :         0,
    1563              :         0
    1564              :     };
    1565              : 
    1566              :     /* Allocate workspace and zero all fields */
    1567         1376 :     cstate = palloc0_object(CopyFromStateData);
    1568              : 
    1569              :     /*
    1570              :      * We allocate everything used by a cstate in a new memory context. This
    1571              :      * avoids memory leaks during repeated use of COPY in a query.
    1572              :      */
    1573         1376 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1574              :                                                 "COPY",
    1575              :                                                 ALLOCSET_DEFAULT_SIZES);
    1576              : 
    1577         1376 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1578              : 
    1579              :     /* Extract options from the statement node tree */
    1580         1376 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1581              : 
    1582              :     /* Set the format routine */
    1583         1195 :     cstate->routine = CopyFromGetRoutine(&cstate->opts);
    1584              : 
    1585              :     /* Process the target relation */
    1586         1195 :     cstate->rel = rel;
    1587              : 
    1588         1195 :     tupDesc = RelationGetDescr(cstate->rel);
    1589              : 
    1590              :     /* process common options or initialization */
    1591              : 
    1592              :     /* Generate or convert list of attributes to process */
    1593         1195 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1594              : 
    1595         1195 :     num_phys_attrs = tupDesc->natts;
    1596              : 
    1597              :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1598         1195 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1599         1195 :     if (cstate->opts.force_notnull_all)
    1600            8 :         MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
    1601         1187 :     else if (cstate->opts.force_notnull)
    1602              :     {
    1603              :         List       *attnums;
    1604              :         ListCell   *cur;
    1605              : 
    1606           17 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1607              : 
    1608           34 :         foreach(cur, attnums)
    1609              :         {
    1610           21 :             int         attnum = lfirst_int(cur);
    1611           21 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1612              : 
    1613           21 :             if (!list_member_int(cstate->attnumlist, attnum))
    1614            4 :                 ereport(ERROR,
    1615              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1616              :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1617              :                          errmsg("%s column \"%s\" not referenced by COPY",
    1618              :                                 "FORCE_NOT_NULL", NameStr(attr->attname))));
    1619           17 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1620              :         }
    1621              :     }
    1622              : 
    1623              :     /* Set up soft error handler for ON_ERROR */
    1624         1191 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
    1625              :     {
    1626           66 :         cstate->escontext = makeNode(ErrorSaveContext);
    1627           66 :         cstate->escontext->type = T_ErrorSaveContext;
    1628           66 :         cstate->escontext->error_occurred = false;
    1629              : 
    1630           66 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
    1631           25 :             cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
    1632           66 :             cstate->escontext->details_wanted = false;
    1633              :     }
    1634              :     else
    1635         1125 :         cstate->escontext = NULL;
    1636              : 
    1637         1191 :     if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
    1638              :     {
    1639           25 :         int         attr_count = list_length(cstate->attnumlist);
    1640              : 
    1641              :         /*
    1642              :          * When data type conversion fails and ON_ERROR is SET_NULL, we need
    1643              :          * ensure that the input column allow null values.  ExecConstraints()
    1644              :          * will cover most of the cases, but it does not verify domain
    1645              :          * constraints.  Therefore, for constrained domains, the null value
    1646              :          * check must be performed during the initial string-to-datum
    1647              :          * conversion (see CopyFromTextLikeOneRow()).
    1648              :          */
    1649           25 :         cstate->domain_with_constraint = palloc0_array(bool, attr_count);
    1650              : 
    1651          124 :         foreach_int(attno, cstate->attnumlist)
    1652              :         {
    1653           74 :             int         i = foreach_current_index(attno);
    1654              : 
    1655           74 :             Form_pg_attribute att = TupleDescAttr(tupDesc, attno - 1);
    1656              : 
    1657           74 :             cstate->domain_with_constraint[i] = DomainHasConstraints(att->atttypid, NULL);
    1658              :         }
    1659              :     }
    1660              : 
    1661              :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1662         1191 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1663         1191 :     if (cstate->opts.force_null_all)
    1664            8 :         MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
    1665         1183 :     else if (cstate->opts.force_null)
    1666              :     {
    1667              :         List       *attnums;
    1668              :         ListCell   *cur;
    1669              : 
    1670           17 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1671              : 
    1672           34 :         foreach(cur, attnums)
    1673              :         {
    1674           21 :             int         attnum = lfirst_int(cur);
    1675           21 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1676              : 
    1677           21 :             if (!list_member_int(cstate->attnumlist, attnum))
    1678            4 :                 ereport(ERROR,
    1679              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1680              :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1681              :                          errmsg("%s column \"%s\" not referenced by COPY",
    1682              :                                 "FORCE_NULL", NameStr(attr->attname))));
    1683           17 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1684              :         }
    1685              :     }
    1686              : 
    1687              :     /* Convert convert_selectively name list to per-column flags */
    1688         1187 :     if (cstate->opts.convert_selectively)
    1689              :     {
    1690              :         List       *attnums;
    1691              :         ListCell   *cur;
    1692              : 
    1693            3 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1694              : 
    1695            3 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1696              : 
    1697            7 :         foreach(cur, attnums)
    1698              :         {
    1699            4 :             int         attnum = lfirst_int(cur);
    1700            4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1701              : 
    1702            4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1703            0 :                 ereport(ERROR,
    1704              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1705              :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1706              :                                          NameStr(attr->attname))));
    1707            4 :             cstate->convert_select_flags[attnum - 1] = true;
    1708              :         }
    1709              :     }
    1710              : 
    1711              :     /* Use client encoding when ENCODING option is not specified. */
    1712         1187 :     if (cstate->opts.file_encoding < 0)
    1713         1167 :         cstate->file_encoding = pg_get_client_encoding();
    1714              :     else
    1715           20 :         cstate->file_encoding = cstate->opts.file_encoding;
    1716              : 
    1717              :     /*
    1718              :      * Look up encoding conversion function.
    1719              :      */
    1720         1187 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1721           52 :         cstate->file_encoding == PG_SQL_ASCII ||
    1722           24 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1723              :     {
    1724         1163 :         cstate->need_transcoding = false;
    1725              :     }
    1726              :     else
    1727              :     {
    1728           24 :         cstate->need_transcoding = true;
    1729           24 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1730              :                                                             GetDatabaseEncoding());
    1731           24 :         if (!OidIsValid(cstate->conversion_proc))
    1732            0 :             ereport(ERROR,
    1733              :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
    1734              :                      errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
    1735              :                             pg_encoding_to_char(cstate->file_encoding),
    1736              :                             pg_encoding_to_char(GetDatabaseEncoding()))));
    1737              :     }
    1738              : 
    1739         1187 :     cstate->copy_src = COPY_FILE;    /* default */
    1740              : 
    1741         1187 :     cstate->whereClause = whereClause;
    1742              : 
    1743              :     /* Initialize state variables */
    1744         1187 :     cstate->eol_type = EOL_UNKNOWN;
    1745         1187 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1746         1187 :     cstate->cur_lineno = 0;
    1747         1187 :     cstate->cur_attname = NULL;
    1748         1187 :     cstate->cur_attval = NULL;
    1749         1187 :     cstate->relname_only = false;
    1750         1187 :     cstate->simd_enabled = true;
    1751              : 
    1752              :     /*
    1753              :      * Allocate buffers for the input pipeline.
    1754              :      *
    1755              :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1756              :      * input_buf and line_buf only in text mode.
    1757              :      */
    1758         1187 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1759         1187 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1760         1187 :     cstate->raw_reached_eof = false;
    1761              : 
    1762         1187 :     initStringInfo(&cstate->attribute_buf);
    1763              : 
    1764              :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1765         1187 :     if (pstate)
    1766              :     {
    1767         1149 :         cstate->range_table = pstate->p_rtable;
    1768         1149 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1769              :     }
    1770              : 
    1771         1187 :     num_defaults = 0;
    1772         1187 :     volatile_defexprs = false;
    1773              : 
    1774              :     /*
    1775              :      * Pick up the required catalog information for each attribute in the
    1776              :      * relation, including the input function, the element type (to pass to
    1777              :      * the input function), and info about defaults and constraints. (Which
    1778              :      * input function we use depends on text/binary format choice.)
    1779              :      */
    1780         1187 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1781         1187 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1782         1187 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1783         1187 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1784              : 
    1785         4731 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1786              :     {
    1787         3553 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1788              : 
    1789              :         /* We don't need info for dropped attributes */
    1790         3553 :         if (att->attisdropped)
    1791           82 :             continue;
    1792              : 
    1793              :         /* Fetch the input function and typioparam info */
    1794         3471 :         cstate->routine->CopyFromInFunc(cstate, att->atttypid,
    1795         3471 :                                         &in_functions[attnum - 1],
    1796         3471 :                                         &typioparams[attnum - 1]);
    1797              : 
    1798              :         /* Get default info if available */
    1799         3470 :         defexprs[attnum - 1] = NULL;
    1800              : 
    1801              :         /*
    1802              :          * We only need the default values for columns that do not appear in
    1803              :          * the column list, unless the DEFAULT option was given. We never need
    1804              :          * default values for generated columns.
    1805              :          */
    1806         3470 :         if ((cstate->opts.default_print != NULL ||
    1807         3388 :              !list_member_int(cstate->attnumlist, attnum)) &&
    1808          421 :             !att->attgenerated)
    1809              :         {
    1810          399 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1811              :                                                                 attnum);
    1812              : 
    1813          399 :             if (defexpr != NULL)
    1814              :             {
    1815              :                 /* Run the expression through planner */
    1816          167 :                 defexpr = expression_planner(defexpr);
    1817              : 
    1818              :                 /* Initialize executable expression in copycontext */
    1819          159 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1820              : 
    1821              :                 /* if NOT copied from input */
    1822              :                 /* use default value if one exists */
    1823          159 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1824              :                 {
    1825          107 :                     defmap[num_defaults] = attnum - 1;
    1826          107 :                     num_defaults++;
    1827              :                 }
    1828              : 
    1829              :                 /*
    1830              :                  * If a default expression looks at the table being loaded,
    1831              :                  * then it could give the wrong answer when using
    1832              :                  * multi-insert. Since database access can be dynamic this is
    1833              :                  * hard to test for exactly, so we use the much wider test of
    1834              :                  * whether the default expression is volatile. We allow for
    1835              :                  * the special case of when the default expression is the
    1836              :                  * nextval() of a sequence which in this specific case is
    1837              :                  * known to be safe for use with the multi-insert
    1838              :                  * optimization. Hence we use this special case function
    1839              :                  * checker rather than the standard check for
    1840              :                  * contain_volatile_functions().  Note also that we already
    1841              :                  * ran the expression through expression_planner().
    1842              :                  */
    1843          159 :                 if (!volatile_defexprs)
    1844          159 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1845              :             }
    1846              :         }
    1847              :     }
    1848              : 
    1849         1178 :     cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
    1850              : 
    1851              :     /* initialize progress */
    1852         1178 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1853         1178 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1854         1178 :     cstate->bytes_processed = 0;
    1855              : 
    1856              :     /* We keep those variables in cstate. */
    1857         1178 :     cstate->in_functions = in_functions;
    1858         1178 :     cstate->typioparams = typioparams;
    1859         1178 :     cstate->defmap = defmap;
    1860         1178 :     cstate->defexprs = defexprs;
    1861         1178 :     cstate->volatile_defexprs = volatile_defexprs;
    1862         1178 :     cstate->num_defaults = num_defaults;
    1863         1178 :     cstate->is_program = is_program;
    1864              : 
    1865         1178 :     if (data_source_cb)
    1866              :     {
    1867          209 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1868          209 :         cstate->copy_src = COPY_CALLBACK;
    1869          209 :         cstate->data_source_cb = data_source_cb;
    1870              :     }
    1871          969 :     else if (pipe)
    1872              :     {
    1873          681 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1874              :         Assert(!is_program);    /* the grammar does not allow this */
    1875          681 :         if (whereToSendOutput == DestRemote)
    1876          681 :             ReceiveCopyBegin(cstate);
    1877              :         else
    1878            0 :             cstate->copy_file = stdin;
    1879              :     }
    1880              :     else
    1881              :     {
    1882          288 :         cstate->filename = pstrdup(filename);
    1883              : 
    1884          288 :         if (cstate->is_program)
    1885              :         {
    1886            0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1887            0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1888            0 :             if (cstate->copy_file == NULL)
    1889            0 :                 ereport(ERROR,
    1890              :                         (errcode_for_file_access(),
    1891              :                          errmsg("could not execute command \"%s\": %m",
    1892              :                                 cstate->filename)));
    1893              :         }
    1894              :         else
    1895              :         {
    1896              :             struct stat st;
    1897              : 
    1898          288 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1899          288 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1900          288 :             if (cstate->copy_file == NULL)
    1901              :             {
    1902              :                 /* copy errno because ereport subfunctions might change it */
    1903            0 :                 int         save_errno = errno;
    1904              : 
    1905            0 :                 ereport(ERROR,
    1906              :                         (errcode_for_file_access(),
    1907              :                          errmsg("could not open file \"%s\" for reading: %m",
    1908              :                                 cstate->filename),
    1909              :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1910              :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1911              :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1912              :             }
    1913              : 
    1914          288 :             if (fstat(fileno(cstate->copy_file), &st))
    1915            0 :                 ereport(ERROR,
    1916              :                         (errcode_for_file_access(),
    1917              :                          errmsg("could not stat file \"%s\": %m",
    1918              :                                 cstate->filename)));
    1919              : 
    1920          288 :             if (S_ISDIR(st.st_mode))
    1921            0 :                 ereport(ERROR,
    1922              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1923              :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1924              : 
    1925          288 :             progress_vals[2] = st.st_size;
    1926              :         }
    1927              :     }
    1928              : 
    1929         1178 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1930              : 
    1931         1178 :     cstate->routine->CopyFromStart(cstate, tupDesc);
    1932              : 
    1933         1178 :     MemoryContextSwitchTo(oldcontext);
    1934              : 
    1935         1178 :     return cstate;
    1936              : }
    1937              : 
    1938              : /*
    1939              :  * Clean up storage and release resources for COPY FROM.
    1940              :  */
    1941              : void
    1942          790 : EndCopyFrom(CopyFromState cstate)
    1943              : {
    1944              :     /* Invoke the end callback */
    1945          790 :     cstate->routine->CopyFromEnd(cstate);
    1946              : 
    1947              :     /* No COPY FROM related resources except memory. */
    1948          790 :     if (cstate->is_program)
    1949              :     {
    1950            0 :         ClosePipeFromProgram(cstate);
    1951              :     }
    1952              :     else
    1953              :     {
    1954          790 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1955            0 :             ereport(ERROR,
    1956              :                     (errcode_for_file_access(),
    1957              :                      errmsg("could not close file \"%s\": %m",
    1958              :                             cstate->filename)));
    1959              :     }
    1960              : 
    1961          790 :     pgstat_progress_end_command();
    1962              : 
    1963          790 :     MemoryContextDelete(cstate->copycontext);
    1964          790 :     pfree(cstate);
    1965          790 : }
    1966              : 
    1967              : /*
    1968              :  * Closes the pipe from an external program, checking the pclose() return code.
    1969              :  */
    1970              : static void
    1971            0 : ClosePipeFromProgram(CopyFromState cstate)
    1972              : {
    1973              :     int         pclose_rc;
    1974              : 
    1975              :     Assert(cstate->is_program);
    1976              : 
    1977            0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1978            0 :     if (pclose_rc == -1)
    1979            0 :         ereport(ERROR,
    1980              :                 (errcode_for_file_access(),
    1981              :                  errmsg("could not close pipe to external command: %m")));
    1982            0 :     else if (pclose_rc != 0)
    1983              :     {
    1984              :         /*
    1985              :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1986              :          * expectable for the called program to fail with SIGPIPE, and we
    1987              :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1988              :          * problem.
    1989              :          */
    1990            0 :         if (!cstate->raw_reached_eof &&
    1991            0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1992            0 :             return;
    1993              : 
    1994            0 :         ereport(ERROR,
    1995              :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1996              :                  errmsg("program \"%s\" failed",
    1997              :                         cstate->filename),
    1998              :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1999              :     }
    2000              : }
        

Generated by: LCOV version 2.0-1