LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 530 580 91.4 %
Date: 2025-04-01 14:15:22 Functions: 23 24 95.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * copyfrom.c
       4             :  *      COPY <table> FROM file/program/client
       5             :  *
       6             :  * This file contains routines needed to efficiently load tuples into a
       7             :  * table.  That includes looking up the correct partition, firing triggers,
       8             :  * calling the table AM function to insert the data, and updating indexes.
       9             :  * Reading data from the input file or client and parsing it into Datums
      10             :  * is handled in copyfromparse.c.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/commands/copyfrom.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : #include "postgres.h"
      22             : 
      23             : #include <ctype.h>
      24             : #include <unistd.h>
      25             : #include <sys/stat.h>
      26             : 
      27             : #include "access/heapam.h"
      28             : #include "access/tableam.h"
      29             : #include "access/xact.h"
      30             : #include "catalog/namespace.h"
      31             : #include "commands/copyapi.h"
      32             : #include "commands/copyfrom_internal.h"
      33             : #include "commands/progress.h"
      34             : #include "commands/trigger.h"
      35             : #include "executor/execPartition.h"
      36             : #include "executor/executor.h"
      37             : #include "executor/nodeModifyTable.h"
      38             : #include "executor/tuptable.h"
      39             : #include "foreign/fdwapi.h"
      40             : #include "mb/pg_wchar.h"
      41             : #include "miscadmin.h"
      42             : #include "nodes/miscnodes.h"
      43             : #include "optimizer/optimizer.h"
      44             : #include "pgstat.h"
      45             : #include "rewrite/rewriteHandler.h"
      46             : #include "storage/fd.h"
      47             : #include "tcop/tcopprot.h"
      48             : #include "utils/lsyscache.h"
      49             : #include "utils/memutils.h"
      50             : #include "utils/portal.h"
      51             : #include "utils/rel.h"
      52             : #include "utils/snapmgr.h"
      53             : 
      54             : /*
      55             :  * No more than this many tuples per CopyMultiInsertBuffer
      56             :  *
      57             :  * Caution: Don't make this too big, as we could end up with this many
      58             :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      59             :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      60             :  * memory requirements during copies into partitioned tables with a large
      61             :  * number of partitions.
      62             :  */
      63             : #define MAX_BUFFERED_TUPLES     1000
      64             : 
      65             : /*
      66             :  * Flush buffers if there are >= this many bytes, as counted by the input
      67             :  * size, of tuples stored.
      68             :  */
      69             : #define MAX_BUFFERED_BYTES      65535
      70             : 
      71             : /*
      72             :  * Trim the list of buffers back down to this number after flushing.  This
      73             :  * must be >= 2.
      74             :  */
      75             : #define MAX_PARTITION_BUFFERS   32
      76             : 
      77             : /* Stores multi-insert data related to a single relation in CopyFrom. */
      78             : typedef struct CopyMultiInsertBuffer
      79             : {
      80             :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      81             :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      82             :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      83             :                                  * table; NULL if foreign table */
      84             :     int         nused;          /* number of 'slots' containing tuples */
      85             :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      86             :                                                  * stream */
      87             : } CopyMultiInsertBuffer;
      88             : 
      89             : /*
      90             :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      91             :  * number of tuples which are stored in them.  This allows multiple buffers to
      92             :  * exist at once when COPYing into a partitioned table.
      93             :  */
      94             : typedef struct CopyMultiInsertInfo
      95             : {
      96             :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      97             :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      98             :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
      99             :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
     100             :     EState     *estate;         /* Executor state used for COPY */
     101             :     CommandId   mycid;          /* Command Id used for COPY */
     102             :     int         ti_options;     /* table insert options */
     103             : } CopyMultiInsertInfo;
     104             : 
     105             : 
     106             : /* non-export function prototypes */
     107             : static void ClosePipeFromProgram(CopyFromState cstate);
     108             : 
     109             : /*
     110             :  * Built-in format-specific routines. One-row callbacks are defined in
     111             :  * copyfromparse.c.
     112             :  */
     113             : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     114             :                                    Oid *typioparam);
     115             : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
     116             : static void CopyFromTextLikeEnd(CopyFromState cstate);
     117             : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     118             :                                  FmgrInfo *finfo, Oid *typioparam);
     119             : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
     120             : static void CopyFromBinaryEnd(CopyFromState cstate);
     121             : 
     122             : 
     123             : /*
     124             :  * COPY FROM routines for built-in formats.
     125             :  *
     126             :  * CSV and text formats share the same TextLike routines except for the
     127             :  * one-row callback.
     128             :  */
     129             : 
     130             : /* text format */
     131             : static const CopyFromRoutine CopyFromRoutineText = {
     132             :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     133             :     .CopyFromStart = CopyFromTextLikeStart,
     134             :     .CopyFromOneRow = CopyFromTextOneRow,
     135             :     .CopyFromEnd = CopyFromTextLikeEnd,
     136             : };
     137             : 
     138             : /* CSV format */
     139             : static const CopyFromRoutine CopyFromRoutineCSV = {
     140             :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     141             :     .CopyFromStart = CopyFromTextLikeStart,
     142             :     .CopyFromOneRow = CopyFromCSVOneRow,
     143             :     .CopyFromEnd = CopyFromTextLikeEnd,
     144             : };
     145             : 
     146             : /* binary format */
     147             : static const CopyFromRoutine CopyFromRoutineBinary = {
     148             :     .CopyFromInFunc = CopyFromBinaryInFunc,
     149             :     .CopyFromStart = CopyFromBinaryStart,
     150             :     .CopyFromOneRow = CopyFromBinaryOneRow,
     151             :     .CopyFromEnd = CopyFromBinaryEnd,
     152             : };
     153             : 
     154             : /* Return a COPY FROM routine for the given options */
     155             : static const CopyFromRoutine *
     156        1820 : CopyFromGetRoutine(const CopyFormatOptions *opts)
     157             : {
     158        1820 :     if (opts->csv_mode)
     159         258 :         return &CopyFromRoutineCSV;
     160        1562 :     else if (opts->binary)
     161          16 :         return &CopyFromRoutineBinary;
     162             : 
     163             :     /* default is text */
     164        1546 :     return &CopyFromRoutineText;
     165             : }
     166             : 
     167             : /* Implementation of the start callback for text and CSV formats */
     168             : static void
     169        1780 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
     170             : {
     171             :     AttrNumber  attr_count;
     172             : 
     173             :     /*
     174             :      * If encoding conversion is needed, we need another buffer to hold the
     175             :      * converted input data.  Otherwise, we can just point input_buf to the
     176             :      * same buffer as raw_buf.
     177             :      */
     178        1780 :     if (cstate->need_transcoding)
     179             :     {
     180          24 :         cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
     181          24 :         cstate->input_buf_index = cstate->input_buf_len = 0;
     182             :     }
     183             :     else
     184        1756 :         cstate->input_buf = cstate->raw_buf;
     185        1780 :     cstate->input_reached_eof = false;
     186             : 
     187        1780 :     initStringInfo(&cstate->line_buf);
     188             : 
     189             :     /*
     190             :      * Create workspace for CopyReadAttributes results; used by CSV and text
     191             :      * format.
     192             :      */
     193        1780 :     attr_count = list_length(cstate->attnumlist);
     194        1780 :     cstate->max_fields = attr_count;
     195        1780 :     cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
     196        1780 : }
     197             : 
     198             : /*
     199             :  * Implementation of the infunc callback for text and CSV formats. Assign
     200             :  * the input function data to the given *finfo.
     201             :  */
     202             : static void
     203        4992 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     204             :                        Oid *typioparam)
     205             : {
     206             :     Oid         func_oid;
     207             : 
     208        4992 :     getTypeInputInfo(atttypid, &func_oid, typioparam);
     209        4992 :     fmgr_info(func_oid, finfo);
     210        4992 : }
     211             : 
     212             : /* Implementation of the end callback for text and CSV formats */
     213             : static void
     214        1176 : CopyFromTextLikeEnd(CopyFromState cstate)
     215             : {
     216             :     /* nothing to do */
     217        1176 : }
     218             : 
     219             : /* Implementation of the start callback for binary format */
     220             : static void
     221          14 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
     222             : {
     223             :     /* Read and verify binary header */
     224          14 :     ReceiveCopyBinaryHeader(cstate);
     225          14 : }
     226             : 
     227             : /*
     228             :  * Implementation of the infunc callback for binary format. Assign
     229             :  * the binary input function to the given *finfo.
     230             :  */
     231             : static void
     232          62 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     233             :                      FmgrInfo *finfo, Oid *typioparam)
     234             : {
     235             :     Oid         func_oid;
     236             : 
     237          62 :     getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
     238          60 :     fmgr_info(func_oid, finfo);
     239          60 : }
     240             : 
     241             : /* Implementation of the end callback for binary format */
     242             : static void
     243           6 : CopyFromBinaryEnd(CopyFromState cstate)
     244             : {
     245             :     /* nothing to do */
     246           6 : }
     247             : 
     248             : /*
     249             :  * error context callback for COPY FROM
     250             :  *
     251             :  * The argument for the error context must be CopyFromState.
     252             :  */
     253             : void
     254         314 : CopyFromErrorCallback(void *arg)
     255             : {
     256         314 :     CopyFromState cstate = (CopyFromState) arg;
     257             : 
     258         314 :     if (cstate->relname_only)
     259             :     {
     260          44 :         errcontext("COPY %s",
     261             :                    cstate->cur_relname);
     262          44 :         return;
     263             :     }
     264         270 :     if (cstate->opts.binary)
     265             :     {
     266             :         /* can't usefully display the data */
     267           2 :         if (cstate->cur_attname)
     268           2 :             errcontext("COPY %s, line %" PRIu64 ", column %s",
     269             :                        cstate->cur_relname,
     270             :                        cstate->cur_lineno,
     271             :                        cstate->cur_attname);
     272             :         else
     273           0 :             errcontext("COPY %s, line %" PRIu64,
     274             :                        cstate->cur_relname,
     275             :                        cstate->cur_lineno);
     276             :     }
     277             :     else
     278             :     {
     279         268 :         if (cstate->cur_attname && cstate->cur_attval)
     280          40 :         {
     281             :             /* error is relevant to a particular column */
     282             :             char       *attval;
     283             : 
     284          40 :             attval = CopyLimitPrintoutLength(cstate->cur_attval);
     285          40 :             errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
     286             :                        cstate->cur_relname,
     287             :                        cstate->cur_lineno,
     288             :                        cstate->cur_attname,
     289             :                        attval);
     290          40 :             pfree(attval);
     291             :         }
     292         228 :         else if (cstate->cur_attname)
     293             :         {
     294             :             /* error is relevant to a particular column, value is NULL */
     295           6 :             errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
     296             :                        cstate->cur_relname,
     297             :                        cstate->cur_lineno,
     298             :                        cstate->cur_attname);
     299             :         }
     300             :         else
     301             :         {
     302             :             /*
     303             :              * Error is relevant to a particular line.
     304             :              *
     305             :              * If line_buf still contains the correct line, print it.
     306             :              */
     307         222 :             if (cstate->line_buf_valid)
     308             :             {
     309             :                 char       *lineval;
     310             : 
     311         184 :                 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
     312         184 :                 errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
     313             :                            cstate->cur_relname,
     314             :                            cstate->cur_lineno, lineval);
     315         184 :                 pfree(lineval);
     316             :             }
     317             :             else
     318             :             {
     319          38 :                 errcontext("COPY %s, line %" PRIu64,
     320             :                            cstate->cur_relname,
     321             :                            cstate->cur_lineno);
     322             :             }
     323             :         }
     324             :     }
     325             : }
     326             : 
     327             : /*
     328             :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     329             :  *
     330             :  * Returns a pstrdup'd copy of the input.
     331             :  */
     332             : char *
     333         260 : CopyLimitPrintoutLength(const char *str)
     334             : {
     335             : #define MAX_COPY_DATA_DISPLAY 100
     336             : 
     337         260 :     int         slen = strlen(str);
     338             :     int         len;
     339             :     char       *res;
     340             : 
     341             :     /* Fast path if definitely okay */
     342         260 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     343         260 :         return pstrdup(str);
     344             : 
     345             :     /* Apply encoding-dependent truncation */
     346           0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     347             : 
     348             :     /*
     349             :      * Truncate, and add "..." to show we truncated the input.
     350             :      */
     351           0 :     res = (char *) palloc(len + 4);
     352           0 :     memcpy(res, str, len);
     353           0 :     strcpy(res + len, "...");
     354             : 
     355           0 :     return res;
     356             : }
     357             : 
     358             : /*
     359             :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     360             :  * ResultRelInfo.
     361             :  */
     362             : static CopyMultiInsertBuffer *
     363        1550 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     364             : {
     365             :     CopyMultiInsertBuffer *buffer;
     366             : 
     367        1550 :     buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
     368        1550 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     369        1550 :     buffer->resultRelInfo = rri;
     370        1550 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     371        1550 :     buffer->nused = 0;
     372             : 
     373        1550 :     return buffer;
     374             : }
     375             : 
     376             : /*
     377             :  * Make a new buffer for this ResultRelInfo.
     378             :  */
     379             : static inline void
     380        1550 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     381             :                                ResultRelInfo *rri)
     382             : {
     383             :     CopyMultiInsertBuffer *buffer;
     384             : 
     385        1550 :     buffer = CopyMultiInsertBufferInit(rri);
     386             : 
     387             :     /* Setup back-link so we can easily find this buffer again */
     388        1550 :     rri->ri_CopyMultiInsertBuffer = buffer;
     389             :     /* Record that we're tracking this buffer */
     390        1550 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     391        1550 : }
     392             : 
     393             : /*
     394             :  * Initialize an already allocated CopyMultiInsertInfo.
     395             :  *
     396             :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     397             :  * for that table.
     398             :  */
     399             : static void
     400        1540 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     401             :                         CopyFromState cstate, EState *estate, CommandId mycid,
     402             :                         int ti_options)
     403             : {
     404        1540 :     miinfo->multiInsertBuffers = NIL;
     405        1540 :     miinfo->bufferedTuples = 0;
     406        1540 :     miinfo->bufferedBytes = 0;
     407        1540 :     miinfo->cstate = cstate;
     408        1540 :     miinfo->estate = estate;
     409        1540 :     miinfo->mycid = mycid;
     410        1540 :     miinfo->ti_options = ti_options;
     411             : 
     412             :     /*
     413             :      * Only setup the buffer when not dealing with a partitioned table.
     414             :      * Buffers for partitioned tables will just be setup when we need to send
     415             :      * tuples their way for the first time.
     416             :      */
     417        1540 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     418        1464 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     419        1540 : }
     420             : 
     421             : /*
     422             :  * Returns true if the buffers are full
     423             :  */
     424             : static inline bool
     425     1195720 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     426             : {
     427     1195720 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     428     1194726 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     429        1100 :         return true;
     430     1194620 :     return false;
     431             : }
     432             : 
     433             : /*
     434             :  * Returns true if we have no buffered tuples
     435             :  */
     436             : static inline bool
     437        1402 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     438             : {
     439        1402 :     return miinfo->bufferedTuples == 0;
     440             : }
     441             : 
     442             : /*
     443             :  * Write the tuples stored in 'buffer' out to the table.
     444             :  */
     445             : static inline void
     446        2460 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     447             :                            CopyMultiInsertBuffer *buffer,
     448             :                            int64 *processed)
     449             : {
     450        2460 :     CopyFromState cstate = miinfo->cstate;
     451        2460 :     EState     *estate = miinfo->estate;
     452        2460 :     int         nused = buffer->nused;
     453        2460 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     454        2460 :     TupleTableSlot **slots = buffer->slots;
     455             :     int         i;
     456             : 
     457        2460 :     if (resultRelInfo->ri_FdwRoutine)
     458             :     {
     459          14 :         int         batch_size = resultRelInfo->ri_BatchSize;
     460          14 :         int         sent = 0;
     461             : 
     462             :         Assert(buffer->bistate == NULL);
     463             : 
     464             :         /* Ensure that the FDW supports batching and it's enabled */
     465             :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     466             :         Assert(batch_size > 1);
     467             : 
     468             :         /*
     469             :          * We suppress error context information other than the relation name,
     470             :          * if one of the operations below fails.
     471             :          */
     472             :         Assert(!cstate->relname_only);
     473          14 :         cstate->relname_only = true;
     474             : 
     475          38 :         while (sent < nused)
     476             :         {
     477          26 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     478          26 :             int         inserted = size;
     479             :             TupleTableSlot **rslots;
     480             : 
     481             :             /* insert into foreign table: let the FDW do it */
     482             :             rslots =
     483          26 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     484             :                                                                      resultRelInfo,
     485          26 :                                                                      &slots[sent],
     486             :                                                                      NULL,
     487             :                                                                      &inserted);
     488             : 
     489          24 :             sent += size;
     490             : 
     491             :             /* No need to do anything if there are no inserted rows */
     492          24 :             if (inserted <= 0)
     493           4 :                 continue;
     494             : 
     495             :             /* Triggers on foreign tables should not have transition tables */
     496             :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     497             :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     498             : 
     499             :             /* Run AFTER ROW INSERT triggers */
     500          20 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     501           0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     502             :             {
     503           0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     504             : 
     505           0 :                 for (i = 0; i < inserted; i++)
     506             :                 {
     507           0 :                     TupleTableSlot *slot = rslots[i];
     508             : 
     509             :                     /*
     510             :                      * AFTER ROW Triggers might reference the tableoid column,
     511             :                      * so (re-)initialize tts_tableOid before evaluating them.
     512             :                      */
     513           0 :                     slot->tts_tableOid = relid;
     514             : 
     515           0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     516             :                                          slot, NIL,
     517             :                                          cstate->transition_capture);
     518             :                 }
     519             :             }
     520             : 
     521             :             /* Update the row counter and progress of the COPY command */
     522          20 :             *processed += inserted;
     523          20 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     524             :                                          *processed);
     525             :         }
     526             : 
     527          48 :         for (i = 0; i < nused; i++)
     528          36 :             ExecClearTuple(slots[i]);
     529             : 
     530             :         /* reset relname_only */
     531          12 :         cstate->relname_only = false;
     532             :     }
     533             :     else
     534             :     {
     535        2446 :         CommandId   mycid = miinfo->mycid;
     536        2446 :         int         ti_options = miinfo->ti_options;
     537        2446 :         bool        line_buf_valid = cstate->line_buf_valid;
     538        2446 :         uint64      save_cur_lineno = cstate->cur_lineno;
     539             :         MemoryContext oldcontext;
     540             : 
     541             :         Assert(buffer->bistate != NULL);
     542             : 
     543             :         /*
     544             :          * Print error context information correctly, if one of the operations
     545             :          * below fails.
     546             :          */
     547        2446 :         cstate->line_buf_valid = false;
     548             : 
     549             :         /*
     550             :          * table_multi_insert may leak memory, so switch to short-lived memory
     551             :          * context before calling it.
     552             :          */
     553        2446 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     554        2446 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     555             :                            slots,
     556             :                            nused,
     557             :                            mycid,
     558             :                            ti_options,
     559             :                            buffer->bistate);
     560        2446 :         MemoryContextSwitchTo(oldcontext);
     561             : 
     562     1197944 :         for (i = 0; i < nused; i++)
     563             :         {
     564             :             /*
     565             :              * If there are any indexes, update them for all the inserted
     566             :              * tuples, and run AFTER ROW INSERT triggers.
     567             :              */
     568     1195512 :             if (resultRelInfo->ri_NumIndices > 0)
     569             :             {
     570             :                 List       *recheckIndexes;
     571             : 
     572      202500 :                 cstate->cur_lineno = buffer->linenos[i];
     573             :                 recheckIndexes =
     574      202500 :                     ExecInsertIndexTuples(resultRelInfo,
     575             :                                           buffer->slots[i], estate, false,
     576             :                                           false, NULL, NIL, false);
     577      202486 :                 ExecARInsertTriggers(estate, resultRelInfo,
     578      202486 :                                      slots[i], recheckIndexes,
     579             :                                      cstate->transition_capture);
     580      202486 :                 list_free(recheckIndexes);
     581             :             }
     582             : 
     583             :             /*
     584             :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     585             :              * triggers anyway.
     586             :              */
     587      993012 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     588          84 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     589          66 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     590             :             {
     591          36 :                 cstate->cur_lineno = buffer->linenos[i];
     592          36 :                 ExecARInsertTriggers(estate, resultRelInfo,
     593          36 :                                      slots[i], NIL,
     594             :                                      cstate->transition_capture);
     595             :             }
     596             : 
     597     1195498 :             ExecClearTuple(slots[i]);
     598             :         }
     599             : 
     600             :         /* Update the row counter and progress of the COPY command */
     601        2432 :         *processed += nused;
     602        2432 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     603             :                                      *processed);
     604             : 
     605             :         /* reset cur_lineno and line_buf_valid to what they were */
     606        2432 :         cstate->line_buf_valid = line_buf_valid;
     607        2432 :         cstate->cur_lineno = save_cur_lineno;
     608             :     }
     609             : 
     610             :     /* Mark that all slots are free */
     611        2444 :     buffer->nused = 0;
     612        2444 : }
     613             : 
     614             : /*
     615             :  * Drop used slots and free member for this buffer.
     616             :  *
     617             :  * The buffer must be flushed before cleanup.
     618             :  */
     619             : static inline void
     620        1368 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     621             :                              CopyMultiInsertBuffer *buffer)
     622             : {
     623        1368 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     624             :     int         i;
     625             : 
     626             :     /* Ensure buffer was flushed */
     627             :     Assert(buffer->nused == 0);
     628             : 
     629             :     /* Remove back-link to ourself */
     630        1368 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     631             : 
     632        1368 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     633             :     {
     634             :         Assert(buffer->bistate != NULL);
     635        1356 :         FreeBulkInsertState(buffer->bistate);
     636             :     }
     637             :     else
     638             :         Assert(buffer->bistate == NULL);
     639             : 
     640             :     /* Since we only create slots on demand, just drop the non-null ones. */
     641      235902 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     642      234534 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     643             : 
     644        1368 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     645        1356 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     646             :                                  miinfo->ti_options);
     647             : 
     648        1368 :     pfree(buffer);
     649        1368 : }
     650             : 
     651             : /*
     652             :  * Write out all stored tuples in all buffers out to the tables.
     653             :  *
     654             :  * Once flushed we also trim the tracked buffers list down to size by removing
     655             :  * the buffers created earliest first.
     656             :  *
     657             :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     658             :  * used.  When cleaning up old buffers we'll never remove the one for
     659             :  * 'curr_rri'.
     660             :  */
     661             : static inline void
     662        2234 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     663             :                          int64 *processed)
     664             : {
     665             :     ListCell   *lc;
     666             : 
     667        4678 :     foreach(lc, miinfo->multiInsertBuffers)
     668             :     {
     669        2460 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     670             : 
     671        2460 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     672             :     }
     673             : 
     674        2218 :     miinfo->bufferedTuples = 0;
     675        2218 :     miinfo->bufferedBytes = 0;
     676             : 
     677             :     /*
     678             :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     679             :      * remove buffers starting with the ones we created first.  It seems less
     680             :      * likely that these older ones will be needed than the ones that were
     681             :      * just created.
     682             :      */
     683        2218 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     684             :     {
     685             :         CopyMultiInsertBuffer *buffer;
     686             : 
     687           0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     688             : 
     689             :         /*
     690             :          * We never want to remove the buffer that's currently being used, so
     691             :          * if we happen to find that then move it to the end of the list.
     692             :          */
     693           0 :         if (buffer->resultRelInfo == curr_rri)
     694             :         {
     695             :             /*
     696             :              * The code below would misbehave if we were trying to reduce the
     697             :              * list to less than two items.
     698             :              */
     699             :             StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
     700             :                              "MAX_PARTITION_BUFFERS must be >= 2");
     701             : 
     702           0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     703           0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     704           0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     705             :         }
     706             : 
     707           0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     708           0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     709             :     }
     710        2218 : }
     711             : 
     712             : /*
     713             :  * Cleanup allocated buffers and free memory
     714             :  */
     715             : static inline void
     716        1358 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     717             : {
     718             :     ListCell   *lc;
     719             : 
     720        2726 :     foreach(lc, miinfo->multiInsertBuffers)
     721        1368 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     722             : 
     723        1358 :     list_free(miinfo->multiInsertBuffers);
     724        1358 : }
     725             : 
     726             : /*
     727             :  * Get the next TupleTableSlot that the next tuple should be stored in.
     728             :  *
     729             :  * Callers must ensure that the buffer is not full.
     730             :  *
     731             :  * Note: 'miinfo' is unused but has been included for consistency with the
     732             :  * other functions in this area.
     733             :  */
     734             : static inline TupleTableSlot *
     735     1197292 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     736             :                                 ResultRelInfo *rri)
     737             : {
     738     1197292 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     739             :     int         nused;
     740             : 
     741             :     Assert(buffer != NULL);
     742             :     Assert(buffer->nused < MAX_BUFFERED_TUPLES);
     743             : 
     744     1197292 :     nused = buffer->nused;
     745             : 
     746     1197292 :     if (buffer->slots[nused] == NULL)
     747      234900 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     748     1197292 :     return buffer->slots[nused];
     749             : }
     750             : 
     751             : /*
     752             :  * Record the previously reserved TupleTableSlot that was reserved by
     753             :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     754             :  */
     755             : static inline void
     756     1195720 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     757             :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     758             : {
     759     1195720 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     760             : 
     761             :     Assert(buffer != NULL);
     762             :     Assert(slot == buffer->slots[buffer->nused]);
     763             : 
     764             :     /* Store the line number so we can properly report any errors later */
     765     1195720 :     buffer->linenos[buffer->nused] = lineno;
     766             : 
     767             :     /* Record this slot as being used */
     768     1195720 :     buffer->nused++;
     769             : 
     770             :     /* Update how many tuples are stored and their size */
     771     1195720 :     miinfo->bufferedTuples++;
     772     1195720 :     miinfo->bufferedBytes += tuplen;
     773     1195720 : }
     774             : 
     775             : /*
     776             :  * Copy FROM file to relation.
     777             :  */
     778             : uint64
     779        1724 : CopyFrom(CopyFromState cstate)
     780             : {
     781             :     ResultRelInfo *resultRelInfo;
     782             :     ResultRelInfo *target_resultRelInfo;
     783        1724 :     ResultRelInfo *prevResultRelInfo = NULL;
     784        1724 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     785             :     ModifyTableState *mtstate;
     786             :     ExprContext *econtext;
     787        1724 :     TupleTableSlot *singleslot = NULL;
     788        1724 :     MemoryContext oldcontext = CurrentMemoryContext;
     789             : 
     790        1724 :     PartitionTupleRouting *proute = NULL;
     791             :     ErrorContextCallback errcallback;
     792        1724 :     CommandId   mycid = GetCurrentCommandId(true);
     793        1724 :     int         ti_options = 0; /* start with default options for insert */
     794        1724 :     BulkInsertState bistate = NULL;
     795             :     CopyInsertMethod insertMethod;
     796        1724 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     797        1724 :     int64       processed = 0;
     798        1724 :     int64       excluded = 0;
     799             :     bool        has_before_insert_row_trig;
     800             :     bool        has_instead_insert_row_trig;
     801        1724 :     bool        leafpart_use_multi_insert = false;
     802             : 
     803             :     Assert(cstate->rel);
     804             :     Assert(list_length(cstate->range_table) == 1);
     805             : 
     806        1724 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
     807             :         Assert(cstate->escontext);
     808             : 
     809             :     /*
     810             :      * The target must be a plain, foreign, or partitioned relation, or have
     811             :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     812             :      * allowed on views, so we only hint about them in the view case.)
     813             :      */
     814        1724 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     815         168 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     816         124 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     817          18 :         !(cstate->rel->trigdesc &&
     818          12 :           cstate->rel->trigdesc->trig_insert_instead_row))
     819             :     {
     820           6 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     821           6 :             ereport(ERROR,
     822             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     823             :                      errmsg("cannot copy to view \"%s\"",
     824             :                             RelationGetRelationName(cstate->rel)),
     825             :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     826           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     827           0 :             ereport(ERROR,
     828             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     829             :                      errmsg("cannot copy to materialized view \"%s\"",
     830             :                             RelationGetRelationName(cstate->rel))));
     831           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     832           0 :             ereport(ERROR,
     833             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     834             :                      errmsg("cannot copy to sequence \"%s\"",
     835             :                             RelationGetRelationName(cstate->rel))));
     836             :         else
     837           0 :             ereport(ERROR,
     838             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     839             :                      errmsg("cannot copy to non-table relation \"%s\"",
     840             :                             RelationGetRelationName(cstate->rel))));
     841             :     }
     842             : 
     843             :     /*
     844             :      * If the target file is new-in-transaction, we assume that checking FSM
     845             :      * for free space is a waste of time.  This could possibly be wrong, but
     846             :      * it's unlikely.
     847             :      */
     848        1718 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     849        1556 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     850        1544 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     851          88 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     852             : 
     853             :     /*
     854             :      * Optimize if new relation storage was created in this subxact or one of
     855             :      * its committed children and we won't see those rows later as part of an
     856             :      * earlier scan or command. The subxact test ensures that if this subxact
     857             :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     858             :      * that the stronger test of exactly which subtransaction created it is
     859             :      * crucial for correctness of this optimization. The test for an earlier
     860             :      * scan or command tolerates false negatives. FREEZE causes other sessions
     861             :      * to see rows they would not see under MVCC, and a false negative merely
     862             :      * spreads that anomaly to the current session.
     863             :      */
     864        1718 :     if (cstate->opts.freeze)
     865             :     {
     866             :         /*
     867             :          * We currently disallow COPY FREEZE on partitioned tables.  The
     868             :          * reason for this is that we've simply not yet opened the partitions
     869             :          * to determine if the optimization can be applied to them.  We could
     870             :          * go and open them all here, but doing so may be quite a costly
     871             :          * overhead for small copies.  In any case, we may just end up routing
     872             :          * tuples to a small number of partitions.  It seems better just to
     873             :          * raise an ERROR for partitioned tables.
     874             :          */
     875          72 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     876             :         {
     877           6 :             ereport(ERROR,
     878             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     879             :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     880             :         }
     881             : 
     882             :         /* There's currently no support for COPY FREEZE on foreign tables. */
     883          66 :         if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
     884           6 :             ereport(ERROR,
     885             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     886             :                      errmsg("cannot perform COPY FREEZE on a foreign table")));
     887             : 
     888             :         /*
     889             :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     890             :          * Scan-bearing queries generally create at least two registrations,
     891             :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     892             :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     893             :          * still detect ongoing catalog scans, each of which separately
     894             :          * registers the snapshot it uses.
     895             :          */
     896          60 :         InvalidateCatalogSnapshot();
     897          60 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     898           0 :             ereport(ERROR,
     899             :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     900             :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     901             : 
     902         120 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     903          60 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     904          18 :             ereport(ERROR,
     905             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     906             :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     907             : 
     908          42 :         ti_options |= TABLE_INSERT_FROZEN;
     909             :     }
     910             : 
     911             :     /*
     912             :      * We need a ResultRelInfo so we can use the regular executor's
     913             :      * index-entry-making machinery.  (There used to be a huge amount of code
     914             :      * here that basically duplicated execUtils.c ...)
     915             :      */
     916        1688 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
     917             :                        bms_make_singleton(1));
     918        1688 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     919        1688 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     920             : 
     921             :     /* Verify the named relation is a valid target for INSERT */
     922        1688 :     CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
     923             : 
     924        1686 :     ExecOpenIndices(resultRelInfo, false);
     925             : 
     926             :     /*
     927             :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     928             :      * foreign-table result relation(s).
     929             :      */
     930        1686 :     mtstate = makeNode(ModifyTableState);
     931        1686 :     mtstate->ps.plan = NULL;
     932        1686 :     mtstate->ps.state = estate;
     933        1686 :     mtstate->operation = CMD_INSERT;
     934        1686 :     mtstate->mt_nrels = 1;
     935        1686 :     mtstate->resultRelInfo = resultRelInfo;
     936        1686 :     mtstate->rootResultRelInfo = resultRelInfo;
     937             : 
     938        1686 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     939          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     940          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     941             :                                                          resultRelInfo);
     942             : 
     943             :     /*
     944             :      * Also, if the named relation is a foreign table, determine if the FDW
     945             :      * supports batch insert and determine the batch size (a FDW may support
     946             :      * batching, but it may be disabled for the server/table).
     947             :      *
     948             :      * If the FDW does not support batching, we set the batch size to 1.
     949             :      */
     950        1686 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     951          36 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     952          36 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     953          36 :         resultRelInfo->ri_BatchSize =
     954          36 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     955             :     else
     956        1650 :         resultRelInfo->ri_BatchSize = 1;
     957             : 
     958             :     Assert(resultRelInfo->ri_BatchSize >= 1);
     959             : 
     960             :     /* Prepare to catch AFTER triggers. */
     961        1686 :     AfterTriggerBeginQuery();
     962             : 
     963             :     /*
     964             :      * If there are any triggers with transition tables on the named relation,
     965             :      * we need to be prepared to capture transition tuples.
     966             :      *
     967             :      * Because partition tuple routing would like to know about whether
     968             :      * transition capture is active, we also set it in mtstate, which is
     969             :      * passed to ExecFindPartition() below.
     970             :      */
     971        1686 :     cstate->transition_capture = mtstate->mt_transition_capture =
     972        1686 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     973        1686 :                                    RelationGetRelid(cstate->rel),
     974             :                                    CMD_INSERT);
     975             : 
     976             :     /*
     977             :      * If the named relation is a partitioned table, initialize state for
     978             :      * CopyFrom tuple routing.
     979             :      */
     980        1686 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     981         100 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     982             : 
     983        1686 :     if (cstate->whereClause)
     984          18 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     985             :                                         &mtstate->ps);
     986             : 
     987             :     /*
     988             :      * It's generally more efficient to prepare a bunch of tuples for
     989             :      * insertion, and insert them in one
     990             :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     991             :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     992             :      * However, there are a number of reasons why we might not be able to do
     993             :      * this.  These are explained below.
     994             :      */
     995        1686 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     996         182 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     997          90 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     998             :     {
     999             :         /*
    1000             :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
    1001             :          * triggers on the table. Such triggers might query the table we're
    1002             :          * inserting into and act differently if the tuples that have already
    1003             :          * been processed and prepared for insertion are not there.
    1004             :          */
    1005         104 :         insertMethod = CIM_SINGLE;
    1006             :     }
    1007        1582 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
    1008          28 :              resultRelInfo->ri_BatchSize == 1)
    1009             :     {
    1010             :         /*
    1011             :          * Can't support multi-inserts to a foreign table if the FDW does not
    1012             :          * support batching, or it's disabled for the server or foreign table.
    1013             :          */
    1014          18 :         insertMethod = CIM_SINGLE;
    1015             :     }
    1016        1564 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
    1017          26 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
    1018             :     {
    1019             :         /*
    1020             :          * For partitioned tables we can't support multi-inserts when there
    1021             :          * are any statement level insert triggers. It might be possible to
    1022             :          * allow partitioned tables with such triggers in the future, but for
    1023             :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
    1024             :          * statement level insert triggers are on the same relation.
    1025             :          */
    1026          18 :         insertMethod = CIM_SINGLE;
    1027             :     }
    1028        1546 :     else if (cstate->volatile_defexprs)
    1029             :     {
    1030             :         /*
    1031             :          * Can't support multi-inserts if there are any volatile default
    1032             :          * expressions in the table.  Similarly to the trigger case above,
    1033             :          * such expressions may query the table we're inserting into.
    1034             :          *
    1035             :          * Note: It does not matter if any partitions have any volatile
    1036             :          * default expressions as we use the defaults from the target of the
    1037             :          * COPY command.
    1038             :          */
    1039           6 :         insertMethod = CIM_SINGLE;
    1040             :     }
    1041        1540 :     else if (contain_volatile_functions(cstate->whereClause))
    1042             :     {
    1043             :         /*
    1044             :          * Can't support multi-inserts if there are any volatile function
    1045             :          * expressions in WHERE clause.  Similarly to the trigger case above,
    1046             :          * such expressions may query the table we're inserting into.
    1047             :          *
    1048             :          * Note: the whereClause was already preprocessed in DoCopy(), so it's
    1049             :          * okay to use contain_volatile_functions() directly.
    1050             :          */
    1051           0 :         insertMethod = CIM_SINGLE;
    1052             :     }
    1053             :     else
    1054             :     {
    1055             :         /*
    1056             :          * For partitioned tables, we may still be able to perform bulk
    1057             :          * inserts.  However, the possibility of this depends on which types
    1058             :          * of triggers exist on the partition.  We must disable bulk inserts
    1059             :          * if the partition is a foreign table that can't use batching or it
    1060             :          * has any before row insert or insert instead triggers (same as we
    1061             :          * checked above for the parent table).  Since the partition's
    1062             :          * resultRelInfos are initialized only when we actually need to insert
    1063             :          * the first tuple into them, we must have the intermediate insert
    1064             :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
    1065             :          * determine if we can use bulk-inserts for the partition being
    1066             :          * inserted into.
    1067             :          */
    1068        1540 :         if (proute)
    1069          76 :             insertMethod = CIM_MULTI_CONDITIONAL;
    1070             :         else
    1071        1464 :             insertMethod = CIM_MULTI;
    1072             : 
    1073        1540 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
    1074             :                                 estate, mycid, ti_options);
    1075             :     }
    1076             : 
    1077             :     /*
    1078             :      * If not using batch mode (which allocates slots as needed) set up a
    1079             :      * tuple slot too. When inserting into a partitioned table, we also need
    1080             :      * one, even if we might batch insert, to read the tuple in the root
    1081             :      * partition's form.
    1082             :      */
    1083        1686 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
    1084             :     {
    1085         222 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
    1086             :                                        &estate->es_tupleTable);
    1087         222 :         bistate = GetBulkInsertState();
    1088             :     }
    1089             : 
    1090        1868 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1091         182 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1092             : 
    1093        1868 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1094         182 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1095             : 
    1096             :     /*
    1097             :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
    1098             :      * should do this for COPY, since it's not really an "INSERT" statement as
    1099             :      * such. However, executing these triggers maintains consistency with the
    1100             :      * EACH ROW triggers that we already fire on COPY.
    1101             :      */
    1102        1686 :     ExecBSInsertTriggers(estate, resultRelInfo);
    1103             : 
    1104        1686 :     econtext = GetPerTupleExprContext(estate);
    1105             : 
    1106             :     /* Set up callback to identify error line number */
    1107        1686 :     errcallback.callback = CopyFromErrorCallback;
    1108        1686 :     errcallback.arg = cstate;
    1109        1686 :     errcallback.previous = error_context_stack;
    1110        1686 :     error_context_stack = &errcallback;
    1111             : 
    1112             :     for (;;)
    1113     1256228 :     {
    1114             :         TupleTableSlot *myslot;
    1115             :         bool        skip_tuple;
    1116             : 
    1117     1257914 :         CHECK_FOR_INTERRUPTS();
    1118             : 
    1119             :         /*
    1120             :          * Reset the per-tuple exprcontext. We do this after every tuple, to
    1121             :          * clean-up after expression evaluations etc.
    1122             :          */
    1123     1257914 :         ResetPerTupleExprContext(estate);
    1124             : 
    1125             :         /* select slot to (initially) load row into */
    1126     1257914 :         if (insertMethod == CIM_SINGLE || proute)
    1127             :         {
    1128      274874 :             myslot = singleslot;
    1129      274874 :             Assert(myslot != NULL);
    1130             :         }
    1131             :         else
    1132             :         {
    1133             :             Assert(resultRelInfo == target_resultRelInfo);
    1134             :             Assert(insertMethod == CIM_MULTI);
    1135             : 
    1136      983040 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1137             :                                                      resultRelInfo);
    1138             :         }
    1139             : 
    1140             :         /*
    1141             :          * Switch to per-tuple context before calling NextCopyFrom, which does
    1142             :          * evaluate default expressions etc. and requires per-tuple context.
    1143             :          */
    1144     1257914 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1145             : 
    1146     1257914 :         ExecClearTuple(myslot);
    1147             : 
    1148             :         /* Directly store the values/nulls array in the slot */
    1149     1257914 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
    1150        1494 :             break;
    1151             : 
    1152     1256268 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
    1153         162 :             cstate->escontext->error_occurred)
    1154             :         {
    1155             :             /*
    1156             :              * Soft error occurred, skip this tuple and just make
    1157             :              * ErrorSaveContext ready for the next NextCopyFrom. Since we
    1158             :              * don't set details_wanted and error_data is not to be filled,
    1159             :              * just resetting error_occurred is enough.
    1160             :              */
    1161         108 :             cstate->escontext->error_occurred = false;
    1162             : 
    1163             :             /* Report that this tuple was skipped by the ON_ERROR clause */
    1164         108 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
    1165         108 :                                          cstate->num_errors);
    1166             : 
    1167         108 :             if (cstate->opts.reject_limit > 0 &&
    1168          48 :                 cstate->num_errors > cstate->opts.reject_limit)
    1169           6 :                 ereport(ERROR,
    1170             :                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
    1171             :                          errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
    1172             :                                 cstate->opts.reject_limit)));
    1173             : 
    1174             :             /* Repeat NextCopyFrom() until no soft error occurs */
    1175         102 :             continue;
    1176             :         }
    1177             : 
    1178     1256160 :         ExecStoreVirtualTuple(myslot);
    1179             : 
    1180             :         /*
    1181             :          * Constraints and where clause might reference the tableoid column,
    1182             :          * so (re-)initialize tts_tableOid before evaluating them.
    1183             :          */
    1184     1256160 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
    1185             : 
    1186             :         /* Triggers and stuff need to be invoked in query context. */
    1187     1256160 :         MemoryContextSwitchTo(oldcontext);
    1188             : 
    1189     1256160 :         if (cstate->whereClause)
    1190             :         {
    1191          66 :             econtext->ecxt_scantuple = myslot;
    1192             :             /* Skip items that don't match COPY's WHERE clause */
    1193          66 :             if (!ExecQual(cstate->qualexpr, econtext))
    1194             :             {
    1195             :                 /*
    1196             :                  * Report that this tuple was filtered out by the WHERE
    1197             :                  * clause.
    1198             :                  */
    1199          36 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1200             :                                              ++excluded);
    1201          36 :                 continue;
    1202             :             }
    1203             :         }
    1204             : 
    1205             :         /* Determine the partition to insert the tuple into */
    1206     1256124 :         if (proute)
    1207             :         {
    1208             :             TupleConversionMap *map;
    1209             : 
    1210             :             /*
    1211             :              * Attempt to find a partition suitable for this tuple.
    1212             :              * ExecFindPartition() will raise an error if none can be found or
    1213             :              * if the found partition is not suitable for INSERTs.
    1214             :              */
    1215      274390 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1216             :                                               proute, myslot, estate);
    1217             : 
    1218      274388 :             if (prevResultRelInfo != resultRelInfo)
    1219             :             {
    1220             :                 /* Determine which triggers exist on this partition */
    1221      161566 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1222          54 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1223             : 
    1224      161566 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1225          54 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1226             : 
    1227             :                 /*
    1228             :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1229             :                  * OF triggers, or if the partition is a foreign table that
    1230             :                  * can't use batching.
    1231             :                  */
    1232      262970 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1233      101458 :                     !has_before_insert_row_trig &&
    1234      364404 :                     !has_instead_insert_row_trig &&
    1235      101434 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1236          12 :                      resultRelInfo->ri_BatchSize > 1);
    1237             : 
    1238             :                 /* Set the multi-insert buffer to use for this partition. */
    1239      161512 :                 if (leafpart_use_multi_insert)
    1240             :                 {
    1241      101430 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1242          86 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1243             :                                                        resultRelInfo);
    1244             :                 }
    1245       60082 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1246          28 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1247             :                 {
    1248             :                     /*
    1249             :                      * Flush pending inserts if this partition can't use
    1250             :                      * batching, so rows are visible to triggers etc.
    1251             :                      */
    1252           0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1253             :                                              resultRelInfo,
    1254             :                                              &processed);
    1255             :                 }
    1256             : 
    1257      161512 :                 if (bistate != NULL)
    1258      161512 :                     ReleaseBulkInsertStatePin(bistate);
    1259      161512 :                 prevResultRelInfo = resultRelInfo;
    1260             :             }
    1261             : 
    1262             :             /*
    1263             :              * If we're capturing transition tuples, we might need to convert
    1264             :              * from the partition rowtype to root rowtype. But if there are no
    1265             :              * BEFORE triggers on the partition that could change the tuple,
    1266             :              * we can just remember the original unconverted tuple to avoid a
    1267             :              * needless round trip conversion.
    1268             :              */
    1269      274388 :             if (cstate->transition_capture != NULL)
    1270          54 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1271          54 :                     !has_before_insert_row_trig ? myslot : NULL;
    1272             : 
    1273             :             /*
    1274             :              * We might need to convert from the root rowtype to the partition
    1275             :              * rowtype.
    1276             :              */
    1277      274388 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1278      274388 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1279             :             {
    1280             :                 /* non batch insert */
    1281       60136 :                 if (map != NULL)
    1282             :                 {
    1283             :                     TupleTableSlot *new_slot;
    1284             : 
    1285         110 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1286         110 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1287             :                 }
    1288             :             }
    1289             :             else
    1290             :             {
    1291             :                 /*
    1292             :                  * Prepare to queue up tuple for later batch insert into
    1293             :                  * current partition.
    1294             :                  */
    1295             :                 TupleTableSlot *batchslot;
    1296             : 
    1297             :                 /* no other path available for partitioned table */
    1298             :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1299             : 
    1300      214252 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1301             :                                                             resultRelInfo);
    1302             : 
    1303      214252 :                 if (map != NULL)
    1304       12200 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1305             :                                                    batchslot);
    1306             :                 else
    1307             :                 {
    1308             :                     /*
    1309             :                      * This looks more expensive than it is (Believe me, I
    1310             :                      * optimized it away. Twice.). The input is in virtual
    1311             :                      * form, and we'll materialize the slot below - for most
    1312             :                      * slot types the copy performs the work materialization
    1313             :                      * would later require anyway.
    1314             :                      */
    1315      202052 :                     ExecCopySlot(batchslot, myslot);
    1316      202052 :                     myslot = batchslot;
    1317             :                 }
    1318             :             }
    1319             : 
    1320             :             /* ensure that triggers etc see the right relation  */
    1321      274388 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1322             :         }
    1323             : 
    1324     1256122 :         skip_tuple = false;
    1325             : 
    1326             :         /* BEFORE ROW INSERT Triggers */
    1327     1256122 :         if (has_before_insert_row_trig)
    1328             :         {
    1329         274 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1330          16 :                 skip_tuple = true;  /* "do nothing" */
    1331             :         }
    1332             : 
    1333     1256122 :         if (!skip_tuple)
    1334             :         {
    1335             :             /*
    1336             :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1337             :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1338             :              * table or foreign table.
    1339             :              */
    1340     1256106 :             if (has_instead_insert_row_trig)
    1341             :             {
    1342          12 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1343             :             }
    1344             :             else
    1345             :             {
    1346             :                 /* Compute stored generated columns */
    1347     1256094 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1348      461968 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1349          34 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1350             :                                                CMD_INSERT);
    1351             : 
    1352             :                 /*
    1353             :                  * If the target is a plain table, check the constraints of
    1354             :                  * the tuple.
    1355             :                  */
    1356     1256094 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1357     1256006 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1358      461932 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1359             : 
    1360             :                 /*
    1361             :                  * Also check the tuple against the partition constraint, if
    1362             :                  * there is one; except that if we got here via tuple-routing,
    1363             :                  * we don't need to if there's no BR trigger defined on the
    1364             :                  * partition.
    1365             :                  */
    1366     1256064 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1367      274376 :                     (proute == NULL || has_before_insert_row_trig))
    1368        2308 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1369             : 
    1370             :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1371     1256064 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1372             :                 {
    1373             :                     /*
    1374             :                      * The slot previously might point into the per-tuple
    1375             :                      * context. For batching it needs to be longer lived.
    1376             :                      */
    1377     1195720 :                     ExecMaterializeSlot(myslot);
    1378             : 
    1379             :                     /* Add this tuple to the tuple buffer */
    1380     1195720 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1381             :                                              resultRelInfo, myslot,
    1382             :                                              cstate->line_buf.len,
    1383             :                                              cstate->cur_lineno);
    1384             : 
    1385             :                     /*
    1386             :                      * If enough inserts have queued up, then flush all
    1387             :                      * buffers out to their tables.
    1388             :                      */
    1389     1195720 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1390        1100 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1391             :                                                  resultRelInfo,
    1392             :                                                  &processed);
    1393             : 
    1394             :                     /*
    1395             :                      * We delay updating the row counter and progress of the
    1396             :                      * COPY command until after writing the tuples stored in
    1397             :                      * the buffer out to the table, as in single insert mode.
    1398             :                      * See CopyMultiInsertBufferFlush().
    1399             :                      */
    1400     1195720 :                     continue;   /* next tuple please */
    1401             :                 }
    1402             :                 else
    1403             :                 {
    1404       60344 :                     List       *recheckIndexes = NIL;
    1405             : 
    1406             :                     /* OK, store the tuple */
    1407       60344 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1408             :                     {
    1409          50 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1410             :                                                                                  resultRelInfo,
    1411             :                                                                                  myslot,
    1412             :                                                                                  NULL);
    1413             : 
    1414          48 :                         if (myslot == NULL) /* "do nothing" */
    1415           4 :                             continue;   /* next tuple please */
    1416             : 
    1417             :                         /*
    1418             :                          * AFTER ROW Triggers might reference the tableoid
    1419             :                          * column, so (re-)initialize tts_tableOid before
    1420             :                          * evaluating them.
    1421             :                          */
    1422          44 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1423             :                     }
    1424             :                     else
    1425             :                     {
    1426             :                         /* OK, store the tuple and create index entries for it */
    1427       60294 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1428             :                                            myslot, mycid, ti_options, bistate);
    1429             : 
    1430       60294 :                         if (resultRelInfo->ri_NumIndices > 0)
    1431           0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1432             :                                                                    myslot,
    1433             :                                                                    estate,
    1434             :                                                                    false,
    1435             :                                                                    false,
    1436             :                                                                    NULL,
    1437             :                                                                    NIL,
    1438             :                                                                    false);
    1439             :                     }
    1440             : 
    1441             :                     /* AFTER ROW INSERT Triggers */
    1442       60338 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1443             :                                          recheckIndexes, cstate->transition_capture);
    1444             : 
    1445       60338 :                     list_free(recheckIndexes);
    1446             :                 }
    1447             :             }
    1448             : 
    1449             :             /*
    1450             :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1451             :              * or FDW; this is the same definition used by nodeModifyTable.c
    1452             :              * for counting tuples inserted by an INSERT command.  Update
    1453             :              * progress of the COPY command as well.
    1454             :              */
    1455       60350 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1456             :                                          ++processed);
    1457             :         }
    1458             :     }
    1459             : 
    1460             :     /* Flush any remaining buffered tuples */
    1461        1494 :     if (insertMethod != CIM_SINGLE)
    1462             :     {
    1463        1374 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1464        1134 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1465             :     }
    1466             : 
    1467             :     /* Done, clean up */
    1468        1478 :     error_context_stack = errcallback.previous;
    1469             : 
    1470        1478 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
    1471          30 :         cstate->num_errors > 0 &&
    1472          30 :         cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
    1473          24 :         ereport(NOTICE,
    1474             :                 errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
    1475             :                               "%" PRIu64 " rows were skipped due to data type incompatibility",
    1476             :                               cstate->num_errors,
    1477             :                               cstate->num_errors));
    1478             : 
    1479        1478 :     if (bistate != NULL)
    1480         194 :         FreeBulkInsertState(bistate);
    1481             : 
    1482        1478 :     MemoryContextSwitchTo(oldcontext);
    1483             : 
    1484             :     /* Execute AFTER STATEMENT insertion triggers */
    1485        1478 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1486             : 
    1487             :     /* Handle queued AFTER triggers */
    1488        1478 :     AfterTriggerEndQuery(estate);
    1489             : 
    1490        1478 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1491             : 
    1492             :     /* Allow the FDW to shut down */
    1493        1478 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1494          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1495          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1496             :                                                               target_resultRelInfo);
    1497             : 
    1498             :     /* Tear down the multi-insert buffer data */
    1499        1478 :     if (insertMethod != CIM_SINGLE)
    1500        1358 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1501             : 
    1502             :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1503        1478 :     if (proute)
    1504          98 :         ExecCleanupTupleRouting(mtstate, proute);
    1505             : 
    1506             :     /* Close the result relations, including any trigger target relations */
    1507        1478 :     ExecCloseResultRelations(estate);
    1508        1478 :     ExecCloseRangeTableRelations(estate);
    1509             : 
    1510        1478 :     FreeExecutorState(estate);
    1511             : 
    1512        1478 :     return processed;
    1513             : }
    1514             : 
    1515             : /*
    1516             :  * Setup to read tuples from a file for COPY FROM.
    1517             :  *
    1518             :  * 'rel': Used as a template for the tuples
    1519             :  * 'whereClause': WHERE clause from the COPY FROM command
    1520             :  * 'filename': Name of server-local file to read, NULL for STDIN
    1521             :  * 'is_program': true if 'filename' is program to execute
    1522             :  * 'data_source_cb': callback that provides the input data
    1523             :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1524             :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1525             :  *
    1526             :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1527             :  */
    1528             : CopyFromState
    1529        2050 : BeginCopyFrom(ParseState *pstate,
    1530             :               Relation rel,
    1531             :               Node *whereClause,
    1532             :               const char *filename,
    1533             :               bool is_program,
    1534             :               copy_data_source_cb data_source_cb,
    1535             :               List *attnamelist,
    1536             :               List *options)
    1537             : {
    1538             :     CopyFromState cstate;
    1539        2050 :     bool        pipe = (filename == NULL);
    1540             :     TupleDesc   tupDesc;
    1541             :     AttrNumber  num_phys_attrs,
    1542             :                 num_defaults;
    1543             :     FmgrInfo   *in_functions;
    1544             :     Oid        *typioparams;
    1545             :     int        *defmap;
    1546             :     ExprState **defexprs;
    1547             :     MemoryContext oldcontext;
    1548             :     bool        volatile_defexprs;
    1549        2050 :     const int   progress_cols[] = {
    1550             :         PROGRESS_COPY_COMMAND,
    1551             :         PROGRESS_COPY_TYPE,
    1552             :         PROGRESS_COPY_BYTES_TOTAL
    1553             :     };
    1554        2050 :     int64       progress_vals[] = {
    1555             :         PROGRESS_COPY_COMMAND_FROM,
    1556             :         0,
    1557             :         0
    1558             :     };
    1559             : 
    1560             :     /* Allocate workspace and zero all fields */
    1561        2050 :     cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
    1562             : 
    1563             :     /*
    1564             :      * We allocate everything used by a cstate in a new memory context. This
    1565             :      * avoids memory leaks during repeated use of COPY in a query.
    1566             :      */
    1567        2050 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1568             :                                                 "COPY",
    1569             :                                                 ALLOCSET_DEFAULT_SIZES);
    1570             : 
    1571        2050 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1572             : 
    1573             :     /* Extract options from the statement node tree */
    1574        2050 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1575             : 
    1576             :     /* Set the format routine */
    1577        1820 :     cstate->routine = CopyFromGetRoutine(&cstate->opts);
    1578             : 
    1579             :     /* Process the target relation */
    1580        1820 :     cstate->rel = rel;
    1581             : 
    1582        1820 :     tupDesc = RelationGetDescr(cstate->rel);
    1583             : 
    1584             :     /* process common options or initialization */
    1585             : 
    1586             :     /* Generate or convert list of attributes to process */
    1587        1820 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1588             : 
    1589        1820 :     num_phys_attrs = tupDesc->natts;
    1590             : 
    1591             :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1592        1820 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1593        1820 :     if (cstate->opts.force_notnull_all)
    1594          12 :         MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
    1595        1808 :     else if (cstate->opts.force_notnull)
    1596             :     {
    1597             :         List       *attnums;
    1598             :         ListCell   *cur;
    1599             : 
    1600          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1601             : 
    1602          52 :         foreach(cur, attnums)
    1603             :         {
    1604          32 :             int         attnum = lfirst_int(cur);
    1605          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1606             : 
    1607          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1608           6 :                 ereport(ERROR,
    1609             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1610             :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1611             :                          errmsg("%s column \"%s\" not referenced by COPY",
    1612             :                                 "FORCE_NOT_NULL", NameStr(attr->attname))));
    1613          26 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1614             :         }
    1615             :     }
    1616             : 
    1617             :     /* Set up soft error handler for ON_ERROR */
    1618        1814 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
    1619             :     {
    1620          64 :         cstate->escontext = makeNode(ErrorSaveContext);
    1621          64 :         cstate->escontext->type = T_ErrorSaveContext;
    1622          64 :         cstate->escontext->error_occurred = false;
    1623             : 
    1624             :         /*
    1625             :          * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
    1626             :          * options later
    1627             :          */
    1628          64 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1629          64 :             cstate->escontext->details_wanted = false;
    1630             :     }
    1631             :     else
    1632        1750 :         cstate->escontext = NULL;
    1633             : 
    1634             :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1635        1814 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1636        1814 :     if (cstate->opts.force_null_all)
    1637          12 :         MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
    1638        1802 :     else if (cstate->opts.force_null)
    1639             :     {
    1640             :         List       *attnums;
    1641             :         ListCell   *cur;
    1642             : 
    1643          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1644             : 
    1645          52 :         foreach(cur, attnums)
    1646             :         {
    1647          32 :             int         attnum = lfirst_int(cur);
    1648          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1649             : 
    1650          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1651           6 :                 ereport(ERROR,
    1652             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1653             :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1654             :                          errmsg("%s column \"%s\" not referenced by COPY",
    1655             :                                 "FORCE_NULL", NameStr(attr->attname))));
    1656          26 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1657             :         }
    1658             :     }
    1659             : 
    1660             :     /* Convert convert_selectively name list to per-column flags */
    1661        1808 :     if (cstate->opts.convert_selectively)
    1662             :     {
    1663             :         List       *attnums;
    1664             :         ListCell   *cur;
    1665             : 
    1666           4 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1667             : 
    1668           4 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1669             : 
    1670           8 :         foreach(cur, attnums)
    1671             :         {
    1672           4 :             int         attnum = lfirst_int(cur);
    1673           4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1674             : 
    1675           4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1676           0 :                 ereport(ERROR,
    1677             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1678             :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1679             :                                          NameStr(attr->attname))));
    1680           4 :             cstate->convert_select_flags[attnum - 1] = true;
    1681             :         }
    1682             :     }
    1683             : 
    1684             :     /* Use client encoding when ENCODING option is not specified. */
    1685        1808 :     if (cstate->opts.file_encoding < 0)
    1686        1790 :         cstate->file_encoding = pg_get_client_encoding();
    1687             :     else
    1688          18 :         cstate->file_encoding = cstate->opts.file_encoding;
    1689             : 
    1690             :     /*
    1691             :      * Look up encoding conversion function.
    1692             :      */
    1693        1808 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1694          54 :         cstate->file_encoding == PG_SQL_ASCII ||
    1695          24 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1696             :     {
    1697        1784 :         cstate->need_transcoding = false;
    1698             :     }
    1699             :     else
    1700             :     {
    1701          24 :         cstate->need_transcoding = true;
    1702          24 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1703             :                                                             GetDatabaseEncoding());
    1704          24 :         if (!OidIsValid(cstate->conversion_proc))
    1705           0 :             ereport(ERROR,
    1706             :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
    1707             :                      errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
    1708             :                             pg_encoding_to_char(cstate->file_encoding),
    1709             :                             pg_encoding_to_char(GetDatabaseEncoding()))));
    1710             :     }
    1711             : 
    1712        1808 :     cstate->copy_src = COPY_FILE;    /* default */
    1713             : 
    1714        1808 :     cstate->whereClause = whereClause;
    1715             : 
    1716             :     /* Initialize state variables */
    1717        1808 :     cstate->eol_type = EOL_UNKNOWN;
    1718        1808 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1719        1808 :     cstate->cur_lineno = 0;
    1720        1808 :     cstate->cur_attname = NULL;
    1721        1808 :     cstate->cur_attval = NULL;
    1722        1808 :     cstate->relname_only = false;
    1723             : 
    1724             :     /*
    1725             :      * Allocate buffers for the input pipeline.
    1726             :      *
    1727             :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1728             :      * input_buf and line_buf only in text mode.
    1729             :      */
    1730        1808 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1731        1808 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1732        1808 :     cstate->raw_reached_eof = false;
    1733             : 
    1734        1808 :     initStringInfo(&cstate->attribute_buf);
    1735             : 
    1736             :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1737        1808 :     if (pstate)
    1738             :     {
    1739        1738 :         cstate->range_table = pstate->p_rtable;
    1740        1738 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1741             :     }
    1742             : 
    1743        1808 :     num_defaults = 0;
    1744        1808 :     volatile_defexprs = false;
    1745             : 
    1746             :     /*
    1747             :      * Pick up the required catalog information for each attribute in the
    1748             :      * relation, including the input function, the element type (to pass to
    1749             :      * the input function), and info about defaults and constraints. (Which
    1750             :      * input function we use depends on text/binary format choice.)
    1751             :      */
    1752        1808 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1753        1808 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1754        1808 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1755        1808 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1756             : 
    1757        6972 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1758             :     {
    1759        5178 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1760             : 
    1761             :         /* We don't need info for dropped attributes */
    1762        5178 :         if (att->attisdropped)
    1763         124 :             continue;
    1764             : 
    1765             :         /* Fetch the input function and typioparam info */
    1766        5054 :         cstate->routine->CopyFromInFunc(cstate, att->atttypid,
    1767        5054 :                                         &in_functions[attnum - 1],
    1768        5054 :                                         &typioparams[attnum - 1]);
    1769             : 
    1770             :         /* Get default info if available */
    1771        5052 :         defexprs[attnum - 1] = NULL;
    1772             : 
    1773             :         /*
    1774             :          * We only need the default values for columns that do not appear in
    1775             :          * the column list, unless the DEFAULT option was given. We never need
    1776             :          * default values for generated columns.
    1777             :          */
    1778        5052 :         if ((cstate->opts.default_print != NULL ||
    1779        4926 :              !list_member_int(cstate->attnumlist, attnum)) &&
    1780         670 :             !att->attgenerated)
    1781             :         {
    1782         634 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1783             :                                                                 attnum);
    1784             : 
    1785         634 :             if (defexpr != NULL)
    1786             :             {
    1787             :                 /* Run the expression through planner */
    1788         264 :                 defexpr = expression_planner(defexpr);
    1789             : 
    1790             :                 /* Initialize executable expression in copycontext */
    1791         252 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1792             : 
    1793             :                 /* if NOT copied from input */
    1794             :                 /* use default value if one exists */
    1795         252 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1796             :                 {
    1797         172 :                     defmap[num_defaults] = attnum - 1;
    1798         172 :                     num_defaults++;
    1799             :                 }
    1800             : 
    1801             :                 /*
    1802             :                  * If a default expression looks at the table being loaded,
    1803             :                  * then it could give the wrong answer when using
    1804             :                  * multi-insert. Since database access can be dynamic this is
    1805             :                  * hard to test for exactly, so we use the much wider test of
    1806             :                  * whether the default expression is volatile. We allow for
    1807             :                  * the special case of when the default expression is the
    1808             :                  * nextval() of a sequence which in this specific case is
    1809             :                  * known to be safe for use with the multi-insert
    1810             :                  * optimization. Hence we use this special case function
    1811             :                  * checker rather than the standard check for
    1812             :                  * contain_volatile_functions().  Note also that we already
    1813             :                  * ran the expression through expression_planner().
    1814             :                  */
    1815         252 :                 if (!volatile_defexprs)
    1816         252 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1817             :             }
    1818             :         }
    1819             :     }
    1820             : 
    1821        1794 :     cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
    1822             : 
    1823             :     /* initialize progress */
    1824        1794 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1825        1794 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1826        1794 :     cstate->bytes_processed = 0;
    1827             : 
    1828             :     /* We keep those variables in cstate. */
    1829        1794 :     cstate->in_functions = in_functions;
    1830        1794 :     cstate->typioparams = typioparams;
    1831        1794 :     cstate->defmap = defmap;
    1832        1794 :     cstate->defexprs = defexprs;
    1833        1794 :     cstate->volatile_defexprs = volatile_defexprs;
    1834        1794 :     cstate->num_defaults = num_defaults;
    1835        1794 :     cstate->is_program = is_program;
    1836             : 
    1837        1794 :     if (data_source_cb)
    1838             :     {
    1839         376 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1840         376 :         cstate->copy_src = COPY_CALLBACK;
    1841         376 :         cstate->data_source_cb = data_source_cb;
    1842             :     }
    1843        1418 :     else if (pipe)
    1844             :     {
    1845         980 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1846             :         Assert(!is_program);    /* the grammar does not allow this */
    1847         980 :         if (whereToSendOutput == DestRemote)
    1848         980 :             ReceiveCopyBegin(cstate);
    1849             :         else
    1850           0 :             cstate->copy_file = stdin;
    1851             :     }
    1852             :     else
    1853             :     {
    1854         438 :         cstate->filename = pstrdup(filename);
    1855             : 
    1856         438 :         if (cstate->is_program)
    1857             :         {
    1858           0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1859           0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1860           0 :             if (cstate->copy_file == NULL)
    1861           0 :                 ereport(ERROR,
    1862             :                         (errcode_for_file_access(),
    1863             :                          errmsg("could not execute command \"%s\": %m",
    1864             :                                 cstate->filename)));
    1865             :         }
    1866             :         else
    1867             :         {
    1868             :             struct stat st;
    1869             : 
    1870         438 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1871         438 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1872         438 :             if (cstate->copy_file == NULL)
    1873             :             {
    1874             :                 /* copy errno because ereport subfunctions might change it */
    1875           0 :                 int         save_errno = errno;
    1876             : 
    1877           0 :                 ereport(ERROR,
    1878             :                         (errcode_for_file_access(),
    1879             :                          errmsg("could not open file \"%s\" for reading: %m",
    1880             :                                 cstate->filename),
    1881             :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1882             :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1883             :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1884             :             }
    1885             : 
    1886         438 :             if (fstat(fileno(cstate->copy_file), &st))
    1887           0 :                 ereport(ERROR,
    1888             :                         (errcode_for_file_access(),
    1889             :                          errmsg("could not stat file \"%s\": %m",
    1890             :                                 cstate->filename)));
    1891             : 
    1892         438 :             if (S_ISDIR(st.st_mode))
    1893           0 :                 ereport(ERROR,
    1894             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1895             :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1896             : 
    1897         438 :             progress_vals[2] = st.st_size;
    1898             :         }
    1899             :     }
    1900             : 
    1901        1794 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1902             : 
    1903        1794 :     cstate->routine->CopyFromStart(cstate, tupDesc);
    1904             : 
    1905        1794 :     MemoryContextSwitchTo(oldcontext);
    1906             : 
    1907        1794 :     return cstate;
    1908             : }
    1909             : 
    1910             : /*
    1911             :  * Clean up storage and release resources for COPY FROM.
    1912             :  */
    1913             : void
    1914        1182 : EndCopyFrom(CopyFromState cstate)
    1915             : {
    1916             :     /* Invoke the end callback */
    1917        1182 :     cstate->routine->CopyFromEnd(cstate);
    1918             : 
    1919             :     /* No COPY FROM related resources except memory. */
    1920        1182 :     if (cstate->is_program)
    1921             :     {
    1922           0 :         ClosePipeFromProgram(cstate);
    1923             :     }
    1924             :     else
    1925             :     {
    1926        1182 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1927           0 :             ereport(ERROR,
    1928             :                     (errcode_for_file_access(),
    1929             :                      errmsg("could not close file \"%s\": %m",
    1930             :                             cstate->filename)));
    1931             :     }
    1932             : 
    1933        1182 :     pgstat_progress_end_command();
    1934             : 
    1935        1182 :     MemoryContextDelete(cstate->copycontext);
    1936        1182 :     pfree(cstate);
    1937        1182 : }
    1938             : 
    1939             : /*
    1940             :  * Closes the pipe from an external program, checking the pclose() return code.
    1941             :  */
    1942             : static void
    1943           0 : ClosePipeFromProgram(CopyFromState cstate)
    1944             : {
    1945             :     int         pclose_rc;
    1946             : 
    1947             :     Assert(cstate->is_program);
    1948             : 
    1949           0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1950           0 :     if (pclose_rc == -1)
    1951           0 :         ereport(ERROR,
    1952             :                 (errcode_for_file_access(),
    1953             :                  errmsg("could not close pipe to external command: %m")));
    1954           0 :     else if (pclose_rc != 0)
    1955             :     {
    1956             :         /*
    1957             :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1958             :          * expectable for the called program to fail with SIGPIPE, and we
    1959             :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1960             :          * problem.
    1961             :          */
    1962           0 :         if (!cstate->raw_reached_eof &&
    1963           0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1964           0 :             return;
    1965             : 
    1966           0 :         ereport(ERROR,
    1967             :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1968             :                  errmsg("program \"%s\" failed",
    1969             :                         cstate->filename),
    1970             :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1971             :     }
    1972             : }

Generated by: LCOV version 1.14