LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.5 % 591 541
Test Date: 2026-03-12 19:14:48 Functions: 95.8 % 24 23
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * copyfrom.c
       4              :  *      COPY <table> FROM file/program/client
       5              :  *
       6              :  * This file contains routines needed to efficiently load tuples into a
       7              :  * table.  That includes looking up the correct partition, firing triggers,
       8              :  * calling the table AM function to insert the data, and updating indexes.
       9              :  * Reading data from the input file or client and parsing it into Datums
      10              :  * is handled in copyfromparse.c.
      11              :  *
      12              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      13              :  * Portions Copyright (c) 1994, Regents of the University of California
      14              :  *
      15              :  *
      16              :  * IDENTIFICATION
      17              :  *    src/backend/commands/copyfrom.c
      18              :  *
      19              :  *-------------------------------------------------------------------------
      20              :  */
      21              : #include "postgres.h"
      22              : 
      23              : #include <ctype.h>
      24              : #include <unistd.h>
      25              : #include <sys/stat.h>
      26              : 
      27              : #include "access/heapam.h"
      28              : #include "access/tableam.h"
      29              : #include "access/xact.h"
      30              : #include "catalog/namespace.h"
      31              : #include "commands/copyapi.h"
      32              : #include "commands/copyfrom_internal.h"
      33              : #include "commands/progress.h"
      34              : #include "commands/trigger.h"
      35              : #include "executor/execPartition.h"
      36              : #include "executor/executor.h"
      37              : #include "executor/nodeModifyTable.h"
      38              : #include "executor/tuptable.h"
      39              : #include "foreign/fdwapi.h"
      40              : #include "mb/pg_wchar.h"
      41              : #include "miscadmin.h"
      42              : #include "nodes/miscnodes.h"
      43              : #include "optimizer/optimizer.h"
      44              : #include "pgstat.h"
      45              : #include "rewrite/rewriteHandler.h"
      46              : #include "storage/fd.h"
      47              : #include "tcop/tcopprot.h"
      48              : #include "utils/lsyscache.h"
      49              : #include "utils/memutils.h"
      50              : #include "utils/portal.h"
      51              : #include "utils/rel.h"
      52              : #include "utils/snapmgr.h"
      53              : #include "utils/typcache.h"
      54              : 
      55              : /*
      56              :  * No more than this many tuples per CopyMultiInsertBuffer
      57              :  *
      58              :  * Caution: Don't make this too big, as we could end up with this many
      59              :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      60              :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      61              :  * memory requirements during copies into partitioned tables with a large
      62              :  * number of partitions.
      63              :  */
      64              : #define MAX_BUFFERED_TUPLES     1000
      65              : 
      66              : /*
      67              :  * Flush buffers if there are >= this many bytes, as counted by the input
      68              :  * size, of tuples stored.
      69              :  */
      70              : #define MAX_BUFFERED_BYTES      65535
      71              : 
      72              : /*
      73              :  * Trim the list of buffers back down to this number after flushing.  This
      74              :  * must be >= 2.
      75              :  */
      76              : #define MAX_PARTITION_BUFFERS   32
      77              : 
      78              : /* Stores multi-insert data related to a single relation in CopyFrom. */
      79              : typedef struct CopyMultiInsertBuffer
      80              : {
      81              :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      82              :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      83              :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      84              :                                  * table; NULL if foreign table */
      85              :     int         nused;          /* number of 'slots' containing tuples */
      86              :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      87              :                                                  * stream */
      88              : } CopyMultiInsertBuffer;
      89              : 
      90              : /*
      91              :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      92              :  * number of tuples which are stored in them.  This allows multiple buffers to
      93              :  * exist at once when COPYing into a partitioned table.
      94              :  */
      95              : typedef struct CopyMultiInsertInfo
      96              : {
      97              :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      98              :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      99              :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
     100              :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
     101              :     EState     *estate;         /* Executor state used for COPY */
     102              :     CommandId   mycid;          /* Command Id used for COPY */
     103              :     int         ti_options;     /* table insert options */
     104              : } CopyMultiInsertInfo;
     105              : 
     106              : 
     107              : /* non-export function prototypes */
     108              : static void ClosePipeFromProgram(CopyFromState cstate);
     109              : 
     110              : /*
     111              :  * Built-in format-specific routines. One-row callbacks are defined in
     112              :  * copyfromparse.c.
     113              :  */
     114              : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     115              :                                    Oid *typioparam);
     116              : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
     117              : static void CopyFromTextLikeEnd(CopyFromState cstate);
     118              : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     119              :                                  FmgrInfo *finfo, Oid *typioparam);
     120              : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
     121              : static void CopyFromBinaryEnd(CopyFromState cstate);
     122              : 
     123              : 
     124              : /*
     125              :  * COPY FROM routines for built-in formats.
     126              :  *
     127              :  * CSV and text formats share the same TextLike routines except for the
     128              :  * one-row callback.
     129              :  */
     130              : 
     131              : /* text format */
     132              : static const CopyFromRoutine CopyFromRoutineText = {
     133              :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     134              :     .CopyFromStart = CopyFromTextLikeStart,
     135              :     .CopyFromOneRow = CopyFromTextOneRow,
     136              :     .CopyFromEnd = CopyFromTextLikeEnd,
     137              : };
     138              : 
     139              : /* CSV format */
     140              : static const CopyFromRoutine CopyFromRoutineCSV = {
     141              :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     142              :     .CopyFromStart = CopyFromTextLikeStart,
     143              :     .CopyFromOneRow = CopyFromCSVOneRow,
     144              :     .CopyFromEnd = CopyFromTextLikeEnd,
     145              : };
     146              : 
     147              : /* binary format */
     148              : static const CopyFromRoutine CopyFromRoutineBinary = {
     149              :     .CopyFromInFunc = CopyFromBinaryInFunc,
     150              :     .CopyFromStart = CopyFromBinaryStart,
     151              :     .CopyFromOneRow = CopyFromBinaryOneRow,
     152              :     .CopyFromEnd = CopyFromBinaryEnd,
     153              : };
     154              : 
     155              : /* Return a COPY FROM routine for the given options */
     156              : static const CopyFromRoutine *
     157          996 : CopyFromGetRoutine(const CopyFormatOptions *opts)
     158              : {
     159          996 :     if (opts->csv_mode)
     160          149 :         return &CopyFromRoutineCSV;
     161          847 :     else if (opts->binary)
     162            8 :         return &CopyFromRoutineBinary;
     163              : 
     164              :     /* default is text */
     165          839 :     return &CopyFromRoutineText;
     166              : }
     167              : 
     168              : /* Implementation of the start callback for text and CSV formats */
     169              : static void
     170          976 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
     171              : {
     172              :     AttrNumber  attr_count;
     173              : 
     174              :     /*
     175              :      * If encoding conversion is needed, we need another buffer to hold the
     176              :      * converted input data.  Otherwise, we can just point input_buf to the
     177              :      * same buffer as raw_buf.
     178              :      */
     179          976 :     if (cstate->need_transcoding)
     180              :     {
     181           18 :         cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
     182           18 :         cstate->input_buf_index = cstate->input_buf_len = 0;
     183              :     }
     184              :     else
     185          958 :         cstate->input_buf = cstate->raw_buf;
     186          976 :     cstate->input_reached_eof = false;
     187              : 
     188          976 :     initStringInfo(&cstate->line_buf);
     189              : 
     190              :     /*
     191              :      * Create workspace for CopyReadAttributes results; used by CSV and text
     192              :      * format.
     193              :      */
     194          976 :     attr_count = list_length(cstate->attnumlist);
     195          976 :     cstate->max_fields = attr_count;
     196          976 :     cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
     197          976 : }
     198              : 
     199              : /*
     200              :  * Implementation of the infunc callback for text and CSV formats. Assign
     201              :  * the input function data to the given *finfo.
     202              :  */
     203              : static void
     204         2828 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     205              :                        Oid *typioparam)
     206              : {
     207              :     Oid         func_oid;
     208              : 
     209         2828 :     getTypeInputInfo(atttypid, &func_oid, typioparam);
     210         2828 :     fmgr_info(func_oid, finfo);
     211         2828 : }
     212              : 
     213              : /* Implementation of the end callback for text and CSV formats */
     214              : static void
     215          635 : CopyFromTextLikeEnd(CopyFromState cstate)
     216              : {
     217              :     /* nothing to do */
     218          635 : }
     219              : 
     220              : /* Implementation of the start callback for binary format */
     221              : static void
     222            7 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
     223              : {
     224              :     /* Read and verify binary header */
     225            7 :     ReceiveCopyBinaryHeader(cstate);
     226            7 : }
     227              : 
     228              : /*
     229              :  * Implementation of the infunc callback for binary format. Assign
     230              :  * the binary input function to the given *finfo.
     231              :  */
     232              : static void
     233           31 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     234              :                      FmgrInfo *finfo, Oid *typioparam)
     235              : {
     236              :     Oid         func_oid;
     237              : 
     238           31 :     getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
     239           30 :     fmgr_info(func_oid, finfo);
     240           30 : }
     241              : 
     242              : /* Implementation of the end callback for binary format */
     243              : static void
     244            3 : CopyFromBinaryEnd(CopyFromState cstate)
     245              : {
     246              :     /* nothing to do */
     247            3 : }
     248              : 
     249              : /*
     250              :  * error context callback for COPY FROM
     251              :  *
     252              :  * The argument for the error context must be CopyFromState.
     253              :  */
     254              : void
     255          193 : CopyFromErrorCallback(void *arg)
     256              : {
     257          193 :     CopyFromState cstate = (CopyFromState) arg;
     258              : 
     259          193 :     if (cstate->relname_only)
     260              :     {
     261           34 :         errcontext("COPY %s",
     262              :                    cstate->cur_relname);
     263           34 :         return;
     264              :     }
     265          159 :     if (cstate->opts.binary)
     266              :     {
     267              :         /* can't usefully display the data */
     268            1 :         if (cstate->cur_attname)
     269            1 :             errcontext("COPY %s, line %" PRIu64 ", column %s",
     270              :                        cstate->cur_relname,
     271              :                        cstate->cur_lineno,
     272              :                        cstate->cur_attname);
     273              :         else
     274            0 :             errcontext("COPY %s, line %" PRIu64,
     275              :                        cstate->cur_relname,
     276              :                        cstate->cur_lineno);
     277              :     }
     278              :     else
     279              :     {
     280          158 :         if (cstate->cur_attname && cstate->cur_attval)
     281           26 :         {
     282              :             /* error is relevant to a particular column */
     283              :             char       *attval;
     284              : 
     285           26 :             attval = CopyLimitPrintoutLength(cstate->cur_attval);
     286           26 :             errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
     287              :                        cstate->cur_relname,
     288              :                        cstate->cur_lineno,
     289              :                        cstate->cur_attname,
     290              :                        attval);
     291           26 :             pfree(attval);
     292              :         }
     293          132 :         else if (cstate->cur_attname)
     294              :         {
     295              :             /* error is relevant to a particular column, value is NULL */
     296            6 :             errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
     297              :                        cstate->cur_relname,
     298              :                        cstate->cur_lineno,
     299              :                        cstate->cur_attname);
     300              :         }
     301              :         else
     302              :         {
     303              :             /*
     304              :              * Error is relevant to a particular line.
     305              :              *
     306              :              * If line_buf still contains the correct line, print it.
     307              :              */
     308          126 :             if (cstate->line_buf_valid)
     309              :             {
     310              :                 char       *lineval;
     311              : 
     312          100 :                 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
     313          100 :                 errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
     314              :                            cstate->cur_relname,
     315              :                            cstate->cur_lineno, lineval);
     316          100 :                 pfree(lineval);
     317              :             }
     318              :             else
     319              :             {
     320           26 :                 errcontext("COPY %s, line %" PRIu64,
     321              :                            cstate->cur_relname,
     322              :                            cstate->cur_lineno);
     323              :             }
     324              :         }
     325              :     }
     326              : }
     327              : 
     328              : /*
     329              :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     330              :  *
     331              :  * Returns a pstrdup'd copy of the input.
     332              :  */
     333              : char *
     334          156 : CopyLimitPrintoutLength(const char *str)
     335              : {
     336              : #define MAX_COPY_DATA_DISPLAY 100
     337              : 
     338          156 :     int         slen = strlen(str);
     339              :     int         len;
     340              :     char       *res;
     341              : 
     342              :     /* Fast path if definitely okay */
     343          156 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     344          156 :         return pstrdup(str);
     345              : 
     346              :     /* Apply encoding-dependent truncation */
     347            0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     348              : 
     349              :     /*
     350              :      * Truncate, and add "..." to show we truncated the input.
     351              :      */
     352            0 :     res = (char *) palloc(len + 4);
     353            0 :     memcpy(res, str, len);
     354            0 :     strcpy(res + len, "...");
     355              : 
     356            0 :     return res;
     357              : }
     358              : 
     359              : /*
     360              :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     361              :  * ResultRelInfo.
     362              :  */
     363              : static CopyMultiInsertBuffer *
     364          855 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     365              : {
     366              :     CopyMultiInsertBuffer *buffer;
     367              : 
     368          855 :     buffer = palloc_object(CopyMultiInsertBuffer);
     369          855 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     370          855 :     buffer->resultRelInfo = rri;
     371          855 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     372          855 :     buffer->nused = 0;
     373              : 
     374          855 :     return buffer;
     375              : }
     376              : 
     377              : /*
     378              :  * Make a new buffer for this ResultRelInfo.
     379              :  */
     380              : static inline void
     381          855 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     382              :                                ResultRelInfo *rri)
     383              : {
     384              :     CopyMultiInsertBuffer *buffer;
     385              : 
     386          855 :     buffer = CopyMultiInsertBufferInit(rri);
     387              : 
     388              :     /* Setup back-link so we can easily find this buffer again */
     389          855 :     rri->ri_CopyMultiInsertBuffer = buffer;
     390              :     /* Record that we're tracking this buffer */
     391          855 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     392          855 : }
     393              : 
     394              : /*
     395              :  * Initialize an already allocated CopyMultiInsertInfo.
     396              :  *
     397              :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     398              :  * for that table.
     399              :  */
     400              : static void
     401          852 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     402              :                         CopyFromState cstate, EState *estate, CommandId mycid,
     403              :                         int ti_options)
     404              : {
     405          852 :     miinfo->multiInsertBuffers = NIL;
     406          852 :     miinfo->bufferedTuples = 0;
     407          852 :     miinfo->bufferedBytes = 0;
     408          852 :     miinfo->cstate = cstate;
     409          852 :     miinfo->estate = estate;
     410          852 :     miinfo->mycid = mycid;
     411          852 :     miinfo->ti_options = ti_options;
     412              : 
     413              :     /*
     414              :      * Only setup the buffer when not dealing with a partitioned table.
     415              :      * Buffers for partitioned tables will just be setup when we need to send
     416              :      * tuples their way for the first time.
     417              :      */
     418          852 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     419          812 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     420          852 : }
     421              : 
     422              : /*
     423              :  * Returns true if the buffers are full
     424              :  */
     425              : static inline bool
     426       600499 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     427              : {
     428       600499 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     429       600004 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     430          548 :         return true;
     431       599951 :     return false;
     432              : }
     433              : 
     434              : /*
     435              :  * Returns true if we have no buffered tuples
     436              :  */
     437              : static inline bool
     438          766 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     439              : {
     440          766 :     return miinfo->bufferedTuples == 0;
     441              : }
     442              : 
     443              : /*
     444              :  * Write the tuples stored in 'buffer' out to the table.
     445              :  */
     446              : static inline void
     447         1249 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     448              :                            CopyMultiInsertBuffer *buffer,
     449              :                            int64 *processed)
     450              : {
     451         1249 :     CopyFromState cstate = miinfo->cstate;
     452         1249 :     EState     *estate = miinfo->estate;
     453         1249 :     int         nused = buffer->nused;
     454         1249 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     455         1249 :     TupleTableSlot **slots = buffer->slots;
     456              :     int         i;
     457              : 
     458         1249 :     if (resultRelInfo->ri_FdwRoutine)
     459              :     {
     460            7 :         int         batch_size = resultRelInfo->ri_BatchSize;
     461            7 :         int         sent = 0;
     462              : 
     463              :         Assert(buffer->bistate == NULL);
     464              : 
     465              :         /* Ensure that the FDW supports batching and it's enabled */
     466              :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     467              :         Assert(batch_size > 1);
     468              : 
     469              :         /*
     470              :          * We suppress error context information other than the relation name,
     471              :          * if one of the operations below fails.
     472              :          */
     473              :         Assert(!cstate->relname_only);
     474            7 :         cstate->relname_only = true;
     475              : 
     476           19 :         while (sent < nused)
     477              :         {
     478           13 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     479           13 :             int         inserted = size;
     480              :             TupleTableSlot **rslots;
     481              : 
     482              :             /* insert into foreign table: let the FDW do it */
     483              :             rslots =
     484           13 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     485              :                                                                      resultRelInfo,
     486           13 :                                                                      &slots[sent],
     487              :                                                                      NULL,
     488              :                                                                      &inserted);
     489              : 
     490           12 :             sent += size;
     491              : 
     492              :             /* No need to do anything if there are no inserted rows */
     493           12 :             if (inserted <= 0)
     494            2 :                 continue;
     495              : 
     496              :             /* Triggers on foreign tables should not have transition tables */
     497              :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     498              :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     499              : 
     500              :             /* Run AFTER ROW INSERT triggers */
     501           10 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     502            0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     503              :             {
     504            0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     505              : 
     506            0 :                 for (i = 0; i < inserted; i++)
     507              :                 {
     508            0 :                     TupleTableSlot *slot = rslots[i];
     509              : 
     510              :                     /*
     511              :                      * AFTER ROW Triggers might reference the tableoid column,
     512              :                      * so (re-)initialize tts_tableOid before evaluating them.
     513              :                      */
     514            0 :                     slot->tts_tableOid = relid;
     515              : 
     516            0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     517              :                                          slot, NIL,
     518              :                                          cstate->transition_capture);
     519              :                 }
     520              :             }
     521              : 
     522              :             /* Update the row counter and progress of the COPY command */
     523           10 :             *processed += inserted;
     524           10 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     525              :                                          *processed);
     526              :         }
     527              : 
     528           24 :         for (i = 0; i < nused; i++)
     529           18 :             ExecClearTuple(slots[i]);
     530              : 
     531              :         /* reset relname_only */
     532            6 :         cstate->relname_only = false;
     533              :     }
     534              :     else
     535              :     {
     536         1242 :         CommandId   mycid = miinfo->mycid;
     537         1242 :         int         ti_options = miinfo->ti_options;
     538         1242 :         bool        line_buf_valid = cstate->line_buf_valid;
     539         1242 :         uint64      save_cur_lineno = cstate->cur_lineno;
     540              :         MemoryContext oldcontext;
     541              : 
     542              :         Assert(buffer->bistate != NULL);
     543              : 
     544              :         /*
     545              :          * Print error context information correctly, if one of the operations
     546              :          * below fails.
     547              :          */
     548         1242 :         cstate->line_buf_valid = false;
     549              : 
     550              :         /*
     551              :          * table_multi_insert may leak memory, so switch to short-lived memory
     552              :          * context before calling it.
     553              :          */
     554         1242 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     555         1242 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     556              :                            slots,
     557              :                            nused,
     558              :                            mycid,
     559              :                            ti_options,
     560         1242 :                            buffer->bistate);
     561         1242 :         MemoryContextSwitchTo(oldcontext);
     562              : 
     563       601606 :         for (i = 0; i < nused; i++)
     564              :         {
     565              :             /*
     566              :              * If there are any indexes, update them for all the inserted
     567              :              * tuples, and run AFTER ROW INSERT triggers.
     568              :              */
     569       600376 :             if (resultRelInfo->ri_NumIndices > 0)
     570              :             {
     571              :                 List       *recheckIndexes;
     572              : 
     573       100959 :                 cstate->cur_lineno = buffer->linenos[i];
     574              :                 recheckIndexes =
     575       100959 :                     ExecInsertIndexTuples(resultRelInfo,
     576              :                                           estate, 0, buffer->slots[i],
     577              :                                           NIL, NULL);
     578       100947 :                 ExecARInsertTriggers(estate, resultRelInfo,
     579       100947 :                                      slots[i], recheckIndexes,
     580              :                                      cstate->transition_capture);
     581       100947 :                 list_free(recheckIndexes);
     582              :             }
     583              : 
     584              :             /*
     585              :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     586              :              * triggers anyway.
     587              :              */
     588       499417 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     589           42 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     590           33 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     591              :             {
     592           18 :                 cstate->cur_lineno = buffer->linenos[i];
     593           18 :                 ExecARInsertTriggers(estate, resultRelInfo,
     594           18 :                                      slots[i], NIL,
     595              :                                      cstate->transition_capture);
     596              :             }
     597              : 
     598       600364 :             ExecClearTuple(slots[i]);
     599              :         }
     600              : 
     601              :         /* Update the row counter and progress of the COPY command */
     602         1230 :         *processed += nused;
     603         1230 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     604              :                                      *processed);
     605              : 
     606              :         /* reset cur_lineno and line_buf_valid to what they were */
     607         1230 :         cstate->line_buf_valid = line_buf_valid;
     608         1230 :         cstate->cur_lineno = save_cur_lineno;
     609              :     }
     610              : 
     611              :     /* Mark that all slots are free */
     612         1236 :     buffer->nused = 0;
     613         1236 : }
     614              : 
     615              : /*
     616              :  * Drop used slots and free member for this buffer.
     617              :  *
     618              :  * The buffer must be flushed before cleanup.
     619              :  */
     620              : static inline void
     621          742 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     622              :                              CopyMultiInsertBuffer *buffer)
     623              : {
     624          742 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     625              :     int         i;
     626              : 
     627              :     /* Ensure buffer was flushed */
     628              :     Assert(buffer->nused == 0);
     629              : 
     630              :     /* Remove back-link to ourself */
     631          742 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     632              : 
     633          742 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     634              :     {
     635              :         Assert(buffer->bistate != NULL);
     636          736 :         FreeBulkInsertState(buffer->bistate);
     637              :     }
     638              :     else
     639              :         Assert(buffer->bistate == NULL);
     640              : 
     641              :     /* Since we only create slots on demand, just drop the non-null ones. */
     642       123679 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     643       122937 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     644              : 
     645          742 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     646          736 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     647              :                                  miinfo->ti_options);
     648              : 
     649          742 :     pfree(buffer);
     650          742 : }
     651              : 
     652              : /*
     653              :  * Write out all stored tuples in all buffers out to the tables.
     654              :  *
     655              :  * Once flushed we also trim the tracked buffers list down to size by removing
     656              :  * the buffers created earliest first.
     657              :  *
     658              :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     659              :  * used.  When cleaning up old buffers we'll never remove the one for
     660              :  * 'curr_rri'.
     661              :  */
     662              : static inline void
     663         1136 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     664              :                          int64 *processed)
     665              : {
     666              :     ListCell   *lc;
     667              : 
     668         2372 :     foreach(lc, miinfo->multiInsertBuffers)
     669              :     {
     670         1249 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     671              : 
     672         1249 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     673              :     }
     674              : 
     675         1123 :     miinfo->bufferedTuples = 0;
     676         1123 :     miinfo->bufferedBytes = 0;
     677              : 
     678              :     /*
     679              :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     680              :      * remove buffers starting with the ones we created first.  It seems less
     681              :      * likely that these older ones will be needed than the ones that were
     682              :      * just created.
     683              :      */
     684         1123 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     685              :     {
     686              :         CopyMultiInsertBuffer *buffer;
     687              : 
     688            0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     689              : 
     690              :         /*
     691              :          * We never want to remove the buffer that's currently being used, so
     692              :          * if we happen to find that then move it to the end of the list.
     693              :          */
     694            0 :         if (buffer->resultRelInfo == curr_rri)
     695              :         {
     696              :             /*
     697              :              * The code below would misbehave if we were trying to reduce the
     698              :              * list to less than two items.
     699              :              */
     700              :             StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
     701              :                              "MAX_PARTITION_BUFFERS must be >= 2");
     702              : 
     703            0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     704            0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     705            0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     706              :         }
     707              : 
     708            0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     709            0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     710              :     }
     711         1123 : }
     712              : 
     713              : /*
     714              :  * Cleanup allocated buffers and free memory
     715              :  */
     716              : static inline void
     717          739 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     718              : {
     719              :     ListCell   *lc;
     720              : 
     721         1481 :     foreach(lc, miinfo->multiInsertBuffers)
     722          742 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     723              : 
     724          739 :     list_free(miinfo->multiInsertBuffers);
     725          739 : }
     726              : 
     727              : /*
     728              :  * Get the next TupleTableSlot that the next tuple should be stored in.
     729              :  *
     730              :  * Callers must ensure that the buffer is not full.
     731              :  *
     732              :  * Note: 'miinfo' is unused but has been included for consistency with the
     733              :  * other functions in this area.
     734              :  */
     735              : static inline TupleTableSlot *
     736       601365 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     737              :                                 ResultRelInfo *rri)
     738              : {
     739       601365 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     740              :     int         nused;
     741              : 
     742              :     Assert(buffer != NULL);
     743              :     Assert(buffer->nused < MAX_BUFFERED_TUPLES);
     744              : 
     745       601365 :     nused = buffer->nused;
     746              : 
     747       601365 :     if (buffer->slots[nused] == NULL)
     748       123166 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     749       601365 :     return buffer->slots[nused];
     750              : }
     751              : 
     752              : /*
     753              :  * Record the previously reserved TupleTableSlot that was reserved by
     754              :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     755              :  */
     756              : static inline void
     757       600499 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     758              :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     759              : {
     760       600499 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     761              : 
     762              :     Assert(buffer != NULL);
     763              :     Assert(slot == buffer->slots[buffer->nused]);
     764              : 
     765              :     /* Store the line number so we can properly report any errors later */
     766       600499 :     buffer->linenos[buffer->nused] = lineno;
     767              : 
     768              :     /* Record this slot as being used */
     769       600499 :     buffer->nused++;
     770              : 
     771              :     /* Update how many tuples are stored and their size */
     772       600499 :     miinfo->bufferedTuples++;
     773       600499 :     miinfo->bufferedBytes += tuplen;
     774       600499 : }
     775              : 
     776              : /*
     777              :  * Copy FROM file to relation.
     778              :  */
     779              : uint64
     780          946 : CopyFrom(CopyFromState cstate)
     781              : {
     782              :     ResultRelInfo *resultRelInfo;
     783              :     ResultRelInfo *target_resultRelInfo;
     784          946 :     ResultRelInfo *prevResultRelInfo = NULL;
     785          946 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     786              :     ModifyTableState *mtstate;
     787              :     ExprContext *econtext;
     788          946 :     TupleTableSlot *singleslot = NULL;
     789          946 :     MemoryContext oldcontext = CurrentMemoryContext;
     790              : 
     791          946 :     PartitionTupleRouting *proute = NULL;
     792              :     ErrorContextCallback errcallback;
     793          946 :     CommandId   mycid = GetCurrentCommandId(true);
     794          946 :     int         ti_options = 0; /* start with default options for insert */
     795          946 :     BulkInsertState bistate = NULL;
     796              :     CopyInsertMethod insertMethod;
     797          946 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     798          946 :     int64       processed = 0;
     799          946 :     int64       excluded = 0;
     800              :     bool        has_before_insert_row_trig;
     801              :     bool        has_instead_insert_row_trig;
     802          946 :     bool        leafpart_use_multi_insert = false;
     803              : 
     804              :     Assert(cstate->rel);
     805              :     Assert(list_length(cstate->range_table) == 1);
     806              : 
     807          946 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
     808              :         Assert(cstate->escontext);
     809              : 
     810              :     /*
     811              :      * The target must be a plain, foreign, or partitioned relation, or have
     812              :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     813              :      * allowed on views, so we only hint about them in the view case.)
     814              :      */
     815          946 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     816           88 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     817           66 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     818            9 :         !(cstate->rel->trigdesc &&
     819            6 :           cstate->rel->trigdesc->trig_insert_instead_row))
     820              :     {
     821            3 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     822            3 :             ereport(ERROR,
     823              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     824              :                      errmsg("cannot copy to view \"%s\"",
     825              :                             RelationGetRelationName(cstate->rel)),
     826              :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     827            0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     828            0 :             ereport(ERROR,
     829              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     830              :                      errmsg("cannot copy to materialized view \"%s\"",
     831              :                             RelationGetRelationName(cstate->rel))));
     832            0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     833            0 :             ereport(ERROR,
     834              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     835              :                      errmsg("cannot copy to sequence \"%s\"",
     836              :                             RelationGetRelationName(cstate->rel))));
     837              :         else
     838            0 :             ereport(ERROR,
     839              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     840              :                      errmsg("cannot copy to non-table relation \"%s\"",
     841              :                             RelationGetRelationName(cstate->rel))));
     842              :     }
     843              : 
     844              :     /*
     845              :      * If the target file is new-in-transaction, we assume that checking FSM
     846              :      * for free space is a waste of time.  This could possibly be wrong, but
     847              :      * it's unlikely.
     848              :      */
     849          943 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     850          858 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     851          851 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     852           45 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     853              : 
     854              :     /*
     855              :      * Optimize if new relation storage was created in this subxact or one of
     856              :      * its committed children and we won't see those rows later as part of an
     857              :      * earlier scan or command. The subxact test ensures that if this subxact
     858              :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     859              :      * that the stronger test of exactly which subtransaction created it is
     860              :      * crucial for correctness of this optimization. The test for an earlier
     861              :      * scan or command tolerates false negatives. FREEZE causes other sessions
     862              :      * to see rows they would not see under MVCC, and a false negative merely
     863              :      * spreads that anomaly to the current session.
     864              :      */
     865          943 :     if (cstate->opts.freeze)
     866              :     {
     867              :         /*
     868              :          * We currently disallow COPY FREEZE on partitioned tables.  The
     869              :          * reason for this is that we've simply not yet opened the partitions
     870              :          * to determine if the optimization can be applied to them.  We could
     871              :          * go and open them all here, but doing so may be quite a costly
     872              :          * overhead for small copies.  In any case, we may just end up routing
     873              :          * tuples to a small number of partitions.  It seems better just to
     874              :          * raise an ERROR for partitioned tables.
     875              :          */
     876           36 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     877              :         {
     878            3 :             ereport(ERROR,
     879              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     880              :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     881              :         }
     882              : 
     883              :         /* There's currently no support for COPY FREEZE on foreign tables. */
     884           33 :         if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
     885            3 :             ereport(ERROR,
     886              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     887              :                      errmsg("cannot perform COPY FREEZE on a foreign table")));
     888              : 
     889              :         /*
     890              :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     891              :          * Scan-bearing queries generally create at least two registrations,
     892              :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     893              :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     894              :          * still detect ongoing catalog scans, each of which separately
     895              :          * registers the snapshot it uses.
     896              :          */
     897           30 :         InvalidateCatalogSnapshot();
     898           30 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     899            0 :             ereport(ERROR,
     900              :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     901              :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     902              : 
     903           60 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     904           30 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     905            9 :             ereport(ERROR,
     906              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     907              :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     908              : 
     909           21 :         ti_options |= TABLE_INSERT_FROZEN;
     910              :     }
     911              : 
     912              :     /*
     913              :      * We need a ResultRelInfo so we can use the regular executor's
     914              :      * index-entry-making machinery.  (There used to be a huge amount of code
     915              :      * here that basically duplicated execUtils.c ...)
     916              :      */
     917          928 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
     918              :                        bms_make_singleton(1));
     919          928 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     920          928 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     921              : 
     922              :     /* Verify the named relation is a valid target for INSERT */
     923          928 :     CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
     924              : 
     925          927 :     ExecOpenIndices(resultRelInfo, false);
     926              : 
     927              :     /*
     928              :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     929              :      * foreign-table result relation(s).
     930              :      */
     931          927 :     mtstate = makeNode(ModifyTableState);
     932          927 :     mtstate->ps.plan = NULL;
     933          927 :     mtstate->ps.state = estate;
     934          927 :     mtstate->operation = CMD_INSERT;
     935          927 :     mtstate->mt_nrels = 1;
     936          927 :     mtstate->resultRelInfo = resultRelInfo;
     937          927 :     mtstate->rootResultRelInfo = resultRelInfo;
     938              : 
     939          927 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     940           18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     941           18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     942              :                                                          resultRelInfo);
     943              : 
     944              :     /*
     945              :      * Also, if the named relation is a foreign table, determine if the FDW
     946              :      * supports batch insert and determine the batch size (a FDW may support
     947              :      * batching, but it may be disabled for the server/table).
     948              :      *
     949              :      * If the FDW does not support batching, we set the batch size to 1.
     950              :      */
     951          927 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     952           18 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     953           18 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     954           18 :         resultRelInfo->ri_BatchSize =
     955           18 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     956              :     else
     957          909 :         resultRelInfo->ri_BatchSize = 1;
     958              : 
     959              :     Assert(resultRelInfo->ri_BatchSize >= 1);
     960              : 
     961              :     /* Prepare to catch AFTER triggers. */
     962          927 :     AfterTriggerBeginQuery();
     963              : 
     964              :     /*
     965              :      * If there are any triggers with transition tables on the named relation,
     966              :      * we need to be prepared to capture transition tuples.
     967              :      *
     968              :      * Because partition tuple routing would like to know about whether
     969              :      * transition capture is active, we also set it in mtstate, which is
     970              :      * passed to ExecFindPartition() below.
     971              :      */
     972          927 :     cstate->transition_capture = mtstate->mt_transition_capture =
     973          927 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     974          927 :                                    RelationGetRelid(cstate->rel),
     975              :                                    CMD_INSERT);
     976              : 
     977              :     /*
     978              :      * If the named relation is a partitioned table, initialize state for
     979              :      * CopyFrom tuple routing.
     980              :      */
     981          927 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     982           54 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     983              : 
     984          927 :     if (cstate->whereClause)
     985            9 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     986              :                                         &mtstate->ps);
     987              : 
     988              :     /*
     989              :      * It's generally more efficient to prepare a bunch of tuples for
     990              :      * insertion, and insert them in one
     991              :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     992              :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     993              :      * However, there are a number of reasons why we might not be able to do
     994              :      * this.  These are explained below.
     995              :      */
     996          927 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     997           92 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     998           46 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     999              :     {
    1000              :         /*
    1001              :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
    1002              :          * triggers on the table. Such triggers might query the table we're
    1003              :          * inserting into and act differently if the tuples that have already
    1004              :          * been processed and prepared for insertion are not there.
    1005              :          */
    1006           52 :         insertMethod = CIM_SINGLE;
    1007              :     }
    1008          875 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
    1009           14 :              resultRelInfo->ri_BatchSize == 1)
    1010              :     {
    1011              :         /*
    1012              :          * Can't support multi-inserts to a foreign table if the FDW does not
    1013              :          * support batching, or it's disabled for the server or foreign table.
    1014              :          */
    1015            9 :         insertMethod = CIM_SINGLE;
    1016              :     }
    1017          866 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
    1018           15 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
    1019              :     {
    1020              :         /*
    1021              :          * For partitioned tables we can't support multi-inserts when there
    1022              :          * are any statement level insert triggers. It might be possible to
    1023              :          * allow partitioned tables with such triggers in the future, but for
    1024              :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
    1025              :          * statement level insert triggers are on the same relation.
    1026              :          */
    1027           11 :         insertMethod = CIM_SINGLE;
    1028              :     }
    1029          855 :     else if (cstate->volatile_defexprs)
    1030              :     {
    1031              :         /*
    1032              :          * Can't support multi-inserts if there are any volatile default
    1033              :          * expressions in the table.  Similarly to the trigger case above,
    1034              :          * such expressions may query the table we're inserting into.
    1035              :          *
    1036              :          * Note: It does not matter if any partitions have any volatile
    1037              :          * default expressions as we use the defaults from the target of the
    1038              :          * COPY command.
    1039              :          */
    1040            3 :         insertMethod = CIM_SINGLE;
    1041              :     }
    1042          852 :     else if (contain_volatile_functions(cstate->whereClause))
    1043              :     {
    1044              :         /*
    1045              :          * Can't support multi-inserts if there are any volatile function
    1046              :          * expressions in WHERE clause.  Similarly to the trigger case above,
    1047              :          * such expressions may query the table we're inserting into.
    1048              :          *
    1049              :          * Note: the whereClause was already preprocessed in DoCopy(), so it's
    1050              :          * okay to use contain_volatile_functions() directly.
    1051              :          */
    1052            0 :         insertMethod = CIM_SINGLE;
    1053              :     }
    1054              :     else
    1055              :     {
    1056              :         /*
    1057              :          * For partitioned tables, we may still be able to perform bulk
    1058              :          * inserts.  However, the possibility of this depends on which types
    1059              :          * of triggers exist on the partition.  We must disable bulk inserts
    1060              :          * if the partition is a foreign table that can't use batching or it
    1061              :          * has any before row insert or insert instead triggers (same as we
    1062              :          * checked above for the parent table).  Since the partition's
    1063              :          * resultRelInfos are initialized only when we actually need to insert
    1064              :          * the first tuple into them, we must have the intermediate insert
    1065              :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
    1066              :          * determine if we can use bulk-inserts for the partition being
    1067              :          * inserted into.
    1068              :          */
    1069          852 :         if (proute)
    1070           40 :             insertMethod = CIM_MULTI_CONDITIONAL;
    1071              :         else
    1072          812 :             insertMethod = CIM_MULTI;
    1073              : 
    1074          852 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
    1075              :                                 estate, mycid, ti_options);
    1076              :     }
    1077              : 
    1078              :     /*
    1079              :      * If not using batch mode (which allocates slots as needed) set up a
    1080              :      * tuple slot too. When inserting into a partitioned table, we also need
    1081              :      * one, even if we might batch insert, to read the tuple in the root
    1082              :      * partition's form.
    1083              :      */
    1084          927 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
    1085              :     {
    1086          115 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
    1087              :                                        &estate->es_tupleTable);
    1088          115 :         bistate = GetBulkInsertState();
    1089              :     }
    1090              : 
    1091         1019 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1092           92 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1093              : 
    1094         1019 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1095           92 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1096              : 
    1097              :     /*
    1098              :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
    1099              :      * should do this for COPY, since it's not really an "INSERT" statement as
    1100              :      * such. However, executing these triggers maintains consistency with the
    1101              :      * EACH ROW triggers that we already fire on COPY.
    1102              :      */
    1103          927 :     ExecBSInsertTriggers(estate, resultRelInfo);
    1104              : 
    1105          927 :     econtext = GetPerTupleExprContext(estate);
    1106              : 
    1107              :     /* Set up callback to identify error line number */
    1108          927 :     errcallback.callback = CopyFromErrorCallback;
    1109          927 :     errcallback.arg = cstate;
    1110          927 :     errcallback.previous = error_context_stack;
    1111          927 :     error_context_stack = &errcallback;
    1112              : 
    1113              :     for (;;)
    1114       630753 :     {
    1115              :         TupleTableSlot *myslot;
    1116              :         bool        skip_tuple;
    1117              : 
    1118       631680 :         CHECK_FOR_INTERRUPTS();
    1119              : 
    1120              :         /*
    1121              :          * Reset the per-tuple exprcontext. We do this after every tuple, to
    1122              :          * clean-up after expression evaluations etc.
    1123              :          */
    1124       631680 :         ResetPerTupleExprContext(estate);
    1125              : 
    1126              :         /* select slot to (initially) load row into */
    1127       631680 :         if (insertMethod == CIM_SINGLE || proute)
    1128              :         {
    1129       137441 :             myslot = singleslot;
    1130       137441 :             Assert(myslot != NULL);
    1131              :         }
    1132              :         else
    1133              :         {
    1134              :             Assert(resultRelInfo == target_resultRelInfo);
    1135              :             Assert(insertMethod == CIM_MULTI);
    1136              : 
    1137       494239 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1138              :                                                      resultRelInfo);
    1139              :         }
    1140              : 
    1141              :         /*
    1142              :          * Switch to per-tuple context before calling NextCopyFrom, which does
    1143              :          * evaluate default expressions etc. and requires per-tuple context.
    1144              :          */
    1145       631680 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1146              : 
    1147       631680 :         ExecClearTuple(myslot);
    1148              : 
    1149              :         /* Directly store the values/nulls array in the slot */
    1150       631680 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
    1151          812 :             break;
    1152              : 
    1153       630775 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
    1154           81 :             cstate->escontext->error_occurred)
    1155              :         {
    1156              :             /*
    1157              :              * Soft error occurred, skip this tuple and just make
    1158              :              * ErrorSaveContext ready for the next NextCopyFrom. Since we
    1159              :              * don't set details_wanted and error_data is not to be filled,
    1160              :              * just resetting error_occurred is enough.
    1161              :              */
    1162           54 :             cstate->escontext->error_occurred = false;
    1163              : 
    1164              :             /* Report that this tuple was skipped by the ON_ERROR clause */
    1165           54 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
    1166           54 :                                          cstate->num_errors);
    1167              : 
    1168           54 :             if (cstate->opts.reject_limit > 0 &&
    1169           24 :                 cstate->num_errors > cstate->opts.reject_limit)
    1170            3 :                 ereport(ERROR,
    1171              :                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
    1172              :                          errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
    1173              :                                 cstate->opts.reject_limit)));
    1174              : 
    1175              :             /* Repeat NextCopyFrom() until no soft error occurs */
    1176           51 :             continue;
    1177              :         }
    1178              : 
    1179       630721 :         ExecStoreVirtualTuple(myslot);
    1180              : 
    1181              :         /*
    1182              :          * Constraints and where clause might reference the tableoid column,
    1183              :          * so (re-)initialize tts_tableOid before evaluating them.
    1184              :          */
    1185       630721 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
    1186              : 
    1187              :         /* Triggers and stuff need to be invoked in query context. */
    1188       630721 :         MemoryContextSwitchTo(oldcontext);
    1189              : 
    1190       630721 :         if (cstate->whereClause)
    1191              :         {
    1192           33 :             econtext->ecxt_scantuple = myslot;
    1193              :             /* Skip items that don't match COPY's WHERE clause */
    1194           33 :             if (!ExecQual(cstate->qualexpr, econtext))
    1195              :             {
    1196              :                 /*
    1197              :                  * Report that this tuple was filtered out by the WHERE
    1198              :                  * clause.
    1199              :                  */
    1200           18 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1201              :                                              ++excluded);
    1202           18 :                 continue;
    1203              :             }
    1204              :         }
    1205              : 
    1206              :         /* Determine the partition to insert the tuple into */
    1207       630703 :         if (proute)
    1208              :         {
    1209              :             TupleConversionMap *map;
    1210              : 
    1211              :             /*
    1212              :              * Attempt to find a partition suitable for this tuple.
    1213              :              * ExecFindPartition() will raise an error if none can be found or
    1214              :              * if the found partition is not suitable for INSERTs.
    1215              :              */
    1216       137197 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1217              :                                               proute, myslot, estate);
    1218              : 
    1219       137196 :             if (prevResultRelInfo != resultRelInfo)
    1220              :             {
    1221              :                 /* Determine which triggers exist on this partition */
    1222        80785 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1223           27 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1224              : 
    1225        80785 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1226           27 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1227              : 
    1228              :                 /*
    1229              :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1230              :                  * OF triggers, or if the partition is a foreign table that
    1231              :                  * can't use batching.
    1232              :                  */
    1233       131487 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1234        50729 :                     !has_before_insert_row_trig &&
    1235       182204 :                     !has_instead_insert_row_trig &&
    1236        50717 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1237            6 :                      resultRelInfo->ri_BatchSize > 1);
    1238              : 
    1239              :                 /* Set the multi-insert buffer to use for this partition. */
    1240        80758 :                 if (leafpart_use_multi_insert)
    1241              :                 {
    1242        50715 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1243           43 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1244              :                                                        resultRelInfo);
    1245              :                 }
    1246        30043 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1247           14 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1248              :                 {
    1249              :                     /*
    1250              :                      * Flush pending inserts if this partition can't use
    1251              :                      * batching, so rows are visible to triggers etc.
    1252              :                      */
    1253            0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1254              :                                              resultRelInfo,
    1255              :                                              &processed);
    1256              :                 }
    1257              : 
    1258        80758 :                 if (bistate != NULL)
    1259        80758 :                     ReleaseBulkInsertStatePin(bistate);
    1260        80758 :                 prevResultRelInfo = resultRelInfo;
    1261              :             }
    1262              : 
    1263              :             /*
    1264              :              * If we're capturing transition tuples, we might need to convert
    1265              :              * from the partition rowtype to root rowtype. But if there are no
    1266              :              * BEFORE triggers on the partition that could change the tuple,
    1267              :              * we can just remember the original unconverted tuple to avoid a
    1268              :              * needless round trip conversion.
    1269              :              */
    1270       137196 :             if (cstate->transition_capture != NULL)
    1271           29 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1272           29 :                     !has_before_insert_row_trig ? myslot : NULL;
    1273              : 
    1274              :             /*
    1275              :              * We might need to convert from the root rowtype to the partition
    1276              :              * rowtype.
    1277              :              */
    1278       137196 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1279       137196 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1280              :             {
    1281              :                 /* non batch insert */
    1282        30070 :                 if (map != NULL)
    1283              :                 {
    1284              :                     TupleTableSlot *new_slot;
    1285              : 
    1286           55 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1287           55 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1288              :                 }
    1289              :             }
    1290              :             else
    1291              :             {
    1292              :                 /*
    1293              :                  * Prepare to queue up tuple for later batch insert into
    1294              :                  * current partition.
    1295              :                  */
    1296              :                 TupleTableSlot *batchslot;
    1297              : 
    1298              :                 /* no other path available for partitioned table */
    1299              :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1300              : 
    1301       107126 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1302              :                                                             resultRelInfo);
    1303              : 
    1304       107126 :                 if (map != NULL)
    1305         6100 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1306              :                                                    batchslot);
    1307              :                 else
    1308              :                 {
    1309              :                     /*
    1310              :                      * This looks more expensive than it is (Believe me, I
    1311              :                      * optimized it away. Twice.). The input is in virtual
    1312              :                      * form, and we'll materialize the slot below - for most
    1313              :                      * slot types the copy performs the work materialization
    1314              :                      * would later require anyway.
    1315              :                      */
    1316       101026 :                     ExecCopySlot(batchslot, myslot);
    1317       101026 :                     myslot = batchslot;
    1318              :                 }
    1319              :             }
    1320              : 
    1321              :             /* ensure that triggers etc see the right relation  */
    1322       137196 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1323              :         }
    1324              : 
    1325       630702 :         skip_tuple = false;
    1326              : 
    1327              :         /* BEFORE ROW INSERT Triggers */
    1328       630702 :         if (has_before_insert_row_trig)
    1329              :         {
    1330          137 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1331            8 :                 skip_tuple = true;  /* "do nothing" */
    1332              :         }
    1333              : 
    1334       630702 :         if (!skip_tuple)
    1335              :         {
    1336              :             /*
    1337              :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1338              :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1339              :              * table or foreign table.
    1340              :              */
    1341       630694 :             if (has_instead_insert_row_trig)
    1342              :             {
    1343            6 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1344              :             }
    1345              :             else
    1346              :             {
    1347              :                 /* Compute stored generated columns */
    1348       630688 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1349       231012 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1350           17 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1351              :                                                CMD_INSERT);
    1352              : 
    1353              :                 /*
    1354              :                  * If the target is a plain table, check the constraints of
    1355              :                  * the tuple.
    1356              :                  */
    1357       630688 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1358       630642 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1359       230994 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1360              : 
    1361              :                 /*
    1362              :                  * Also check the tuple against the partition constraint, if
    1363              :                  * there is one; except that if we got here via tuple-routing,
    1364              :                  * we don't need to if there's no BR trigger defined on the
    1365              :                  * partition.
    1366              :                  */
    1367       630673 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1368       137190 :                     (proute == NULL || has_before_insert_row_trig))
    1369         1154 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1370              : 
    1371              :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1372       630673 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1373              :                 {
    1374              :                     /*
    1375              :                      * The slot previously might point into the per-tuple
    1376              :                      * context. For batching it needs to be longer lived.
    1377              :                      */
    1378       600499 :                     ExecMaterializeSlot(myslot);
    1379              : 
    1380              :                     /* Add this tuple to the tuple buffer */
    1381       600499 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1382              :                                              resultRelInfo, myslot,
    1383              :                                              cstate->line_buf.len,
    1384              :                                              cstate->cur_lineno);
    1385              : 
    1386              :                     /*
    1387              :                      * If enough inserts have queued up, then flush all
    1388              :                      * buffers out to their tables.
    1389              :                      */
    1390       600499 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1391          548 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1392              :                                                  resultRelInfo,
    1393              :                                                  &processed);
    1394              : 
    1395              :                     /*
    1396              :                      * We delay updating the row counter and progress of the
    1397              :                      * COPY command until after writing the tuples stored in
    1398              :                      * the buffer out to the table, as in single insert mode.
    1399              :                      * See CopyMultiInsertBufferFlush().
    1400              :                      */
    1401       600499 :                     continue;   /* next tuple please */
    1402              :                 }
    1403              :                 else
    1404              :                 {
    1405        30174 :                     List       *recheckIndexes = NIL;
    1406              : 
    1407              :                     /* OK, store the tuple */
    1408        30174 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1409              :                     {
    1410           27 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1411              :                                                                                  resultRelInfo,
    1412              :                                                                                  myslot,
    1413              :                                                                                  NULL);
    1414              : 
    1415           26 :                         if (myslot == NULL) /* "do nothing" */
    1416            2 :                             continue;   /* next tuple please */
    1417              : 
    1418              :                         /*
    1419              :                          * AFTER ROW Triggers might reference the tableoid
    1420              :                          * column, so (re-)initialize tts_tableOid before
    1421              :                          * evaluating them.
    1422              :                          */
    1423           24 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1424              :                     }
    1425              :                     else
    1426              :                     {
    1427              :                         /* OK, store the tuple and create index entries for it */
    1428        30147 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1429              :                                            myslot, mycid, ti_options, bistate);
    1430              : 
    1431        30147 :                         if (resultRelInfo->ri_NumIndices > 0)
    1432            0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1433              :                                                                    estate, 0,
    1434              :                                                                    myslot, NIL,
    1435              :                                                                    NULL);
    1436              :                     }
    1437              : 
    1438              :                     /* AFTER ROW INSERT Triggers */
    1439        30171 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1440              :                                          recheckIndexes, cstate->transition_capture);
    1441              : 
    1442        30169 :                     list_free(recheckIndexes);
    1443              :                 }
    1444              :             }
    1445              : 
    1446              :             /*
    1447              :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1448              :              * or FDW; this is the same definition used by nodeModifyTable.c
    1449              :              * for counting tuples inserted by an INSERT command.  Update
    1450              :              * progress of the COPY command as well.
    1451              :              */
    1452        30175 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1453              :                                          ++processed);
    1454              :         }
    1455              :     }
    1456              : 
    1457              :     /* Flush any remaining buffered tuples */
    1458          812 :     if (insertMethod != CIM_SINGLE)
    1459              :     {
    1460          752 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1461          588 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1462              :     }
    1463              : 
    1464              :     /* Done, clean up */
    1465          799 :     error_context_stack = errcallback.previous;
    1466              : 
    1467          799 :     if (cstate->num_errors > 0 &&
    1468           18 :         cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
    1469              :     {
    1470           15 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1471           12 :             ereport(NOTICE,
    1472              :                     errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
    1473              :                                   "%" PRIu64 " rows were skipped due to data type incompatibility",
    1474              :                                   cstate->num_errors,
    1475              :                                   cstate->num_errors));
    1476            3 :         else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
    1477            3 :             ereport(NOTICE,
    1478              :                     errmsg_plural("in %" PRIu64 " row, columns were set to null due to data type incompatibility",
    1479              :                                   "in %" PRIu64 " rows, columns were set to null due to data type incompatibility",
    1480              :                                   cstate->num_errors,
    1481              :                                   cstate->num_errors));
    1482              :     }
    1483              : 
    1484          799 :     if (bistate != NULL)
    1485           99 :         FreeBulkInsertState(bistate);
    1486              : 
    1487          799 :     MemoryContextSwitchTo(oldcontext);
    1488              : 
    1489              :     /* Execute AFTER STATEMENT insertion triggers */
    1490          799 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1491              : 
    1492              :     /* Handle queued AFTER triggers */
    1493          799 :     AfterTriggerEndQuery(estate);
    1494              : 
    1495          799 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1496              : 
    1497              :     /* Allow the FDW to shut down */
    1498          799 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1499           16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1500           16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1501              :                                                               target_resultRelInfo);
    1502              : 
    1503              :     /* Tear down the multi-insert buffer data */
    1504          799 :     if (insertMethod != CIM_SINGLE)
    1505          739 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1506              : 
    1507              :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1508          799 :     if (proute)
    1509           51 :         ExecCleanupTupleRouting(mtstate, proute);
    1510              : 
    1511              :     /* Close the result relations, including any trigger target relations */
    1512          799 :     ExecCloseResultRelations(estate);
    1513          799 :     ExecCloseRangeTableRelations(estate);
    1514              : 
    1515          799 :     FreeExecutorState(estate);
    1516              : 
    1517          799 :     return processed;
    1518              : }
    1519              : 
    1520              : /*
    1521              :  * Setup to read tuples from a file for COPY FROM.
    1522              :  *
    1523              :  * 'rel': Used as a template for the tuples
    1524              :  * 'whereClause': WHERE clause from the COPY FROM command
    1525              :  * 'filename': Name of server-local file to read, NULL for STDIN
    1526              :  * 'is_program': true if 'filename' is program to execute
    1527              :  * 'data_source_cb': callback that provides the input data
    1528              :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1529              :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1530              :  *
    1531              :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1532              :  */
    1533              : CopyFromState
    1534         1129 : BeginCopyFrom(ParseState *pstate,
    1535              :               Relation rel,
    1536              :               Node *whereClause,
    1537              :               const char *filename,
    1538              :               bool is_program,
    1539              :               copy_data_source_cb data_source_cb,
    1540              :               List *attnamelist,
    1541              :               List *options)
    1542              : {
    1543              :     CopyFromState cstate;
    1544         1129 :     bool        pipe = (filename == NULL);
    1545              :     TupleDesc   tupDesc;
    1546              :     AttrNumber  num_phys_attrs,
    1547              :                 num_defaults;
    1548              :     FmgrInfo   *in_functions;
    1549              :     Oid        *typioparams;
    1550              :     int        *defmap;
    1551              :     ExprState **defexprs;
    1552              :     MemoryContext oldcontext;
    1553              :     bool        volatile_defexprs;
    1554         1129 :     const int   progress_cols[] = {
    1555              :         PROGRESS_COPY_COMMAND,
    1556              :         PROGRESS_COPY_TYPE,
    1557              :         PROGRESS_COPY_BYTES_TOTAL
    1558              :     };
    1559         1129 :     int64       progress_vals[] = {
    1560              :         PROGRESS_COPY_COMMAND_FROM,
    1561              :         0,
    1562              :         0
    1563              :     };
    1564              : 
    1565              :     /* Allocate workspace and zero all fields */
    1566         1129 :     cstate = palloc0_object(CopyFromStateData);
    1567              : 
    1568              :     /*
    1569              :      * We allocate everything used by a cstate in a new memory context. This
    1570              :      * avoids memory leaks during repeated use of COPY in a query.
    1571              :      */
    1572         1129 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1573              :                                                 "COPY",
    1574              :                                                 ALLOCSET_DEFAULT_SIZES);
    1575              : 
    1576         1129 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1577              : 
    1578              :     /* Extract options from the statement node tree */
    1579         1129 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1580              : 
    1581              :     /* Set the format routine */
    1582          996 :     cstate->routine = CopyFromGetRoutine(&cstate->opts);
    1583              : 
    1584              :     /* Process the target relation */
    1585          996 :     cstate->rel = rel;
    1586              : 
    1587          996 :     tupDesc = RelationGetDescr(cstate->rel);
    1588              : 
    1589              :     /* process common options or initialization */
    1590              : 
    1591              :     /* Generate or convert list of attributes to process */
    1592          996 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1593              : 
    1594          996 :     num_phys_attrs = tupDesc->natts;
    1595              : 
    1596              :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1597          996 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1598          996 :     if (cstate->opts.force_notnull_all)
    1599            6 :         MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
    1600          990 :     else if (cstate->opts.force_notnull)
    1601              :     {
    1602              :         List       *attnums;
    1603              :         ListCell   *cur;
    1604              : 
    1605           13 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1606              : 
    1607           26 :         foreach(cur, attnums)
    1608              :         {
    1609           16 :             int         attnum = lfirst_int(cur);
    1610           16 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1611              : 
    1612           16 :             if (!list_member_int(cstate->attnumlist, attnum))
    1613            3 :                 ereport(ERROR,
    1614              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1615              :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1616              :                          errmsg("%s column \"%s\" not referenced by COPY",
    1617              :                                 "FORCE_NOT_NULL", NameStr(attr->attname))));
    1618           13 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1619              :         }
    1620              :     }
    1621              : 
    1622              :     /* Set up soft error handler for ON_ERROR */
    1623          993 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
    1624              :     {
    1625           50 :         cstate->escontext = makeNode(ErrorSaveContext);
    1626           50 :         cstate->escontext->type = T_ErrorSaveContext;
    1627           50 :         cstate->escontext->error_occurred = false;
    1628              : 
    1629           50 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
    1630           18 :             cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
    1631           50 :             cstate->escontext->details_wanted = false;
    1632              :     }
    1633              :     else
    1634          943 :         cstate->escontext = NULL;
    1635              : 
    1636          993 :     if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
    1637              :     {
    1638           18 :         int         attr_count = list_length(cstate->attnumlist);
    1639              : 
    1640              :         /*
    1641              :          * When data type conversion fails and ON_ERROR is SET_NULL, we need
    1642              :          * ensure that the input column allow null values.  ExecConstraints()
    1643              :          * will cover most of the cases, but it does not verify domain
    1644              :          * constraints.  Therefore, for constrained domains, the null value
    1645              :          * check must be performed during the initial string-to-datum
    1646              :          * conversion (see CopyFromTextLikeOneRow()).
    1647              :          */
    1648           18 :         cstate->domain_with_constraint = palloc0_array(bool, attr_count);
    1649              : 
    1650           90 :         foreach_int(attno, cstate->attnumlist)
    1651              :         {
    1652           54 :             int         i = foreach_current_index(attno);
    1653              : 
    1654           54 :             Form_pg_attribute att = TupleDescAttr(tupDesc, attno - 1);
    1655              : 
    1656           54 :             cstate->domain_with_constraint[i] = DomainHasConstraints(att->atttypid);
    1657              :         }
    1658              :     }
    1659              : 
    1660              :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1661          993 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1662          993 :     if (cstate->opts.force_null_all)
    1663            6 :         MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
    1664          987 :     else if (cstate->opts.force_null)
    1665              :     {
    1666              :         List       *attnums;
    1667              :         ListCell   *cur;
    1668              : 
    1669           13 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1670              : 
    1671           26 :         foreach(cur, attnums)
    1672              :         {
    1673           16 :             int         attnum = lfirst_int(cur);
    1674           16 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1675              : 
    1676           16 :             if (!list_member_int(cstate->attnumlist, attnum))
    1677            3 :                 ereport(ERROR,
    1678              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1679              :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1680              :                          errmsg("%s column \"%s\" not referenced by COPY",
    1681              :                                 "FORCE_NULL", NameStr(attr->attname))));
    1682           13 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1683              :         }
    1684              :     }
    1685              : 
    1686              :     /* Convert convert_selectively name list to per-column flags */
    1687          990 :     if (cstate->opts.convert_selectively)
    1688              :     {
    1689              :         List       *attnums;
    1690              :         ListCell   *cur;
    1691              : 
    1692            3 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1693              : 
    1694            3 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1695              : 
    1696            7 :         foreach(cur, attnums)
    1697              :         {
    1698            4 :             int         attnum = lfirst_int(cur);
    1699            4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1700              : 
    1701            4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1702            0 :                 ereport(ERROR,
    1703              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1704              :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1705              :                                          NameStr(attr->attname))));
    1706            4 :             cstate->convert_select_flags[attnum - 1] = true;
    1707              :         }
    1708              :     }
    1709              : 
    1710              :     /* Use client encoding when ENCODING option is not specified. */
    1711          990 :     if (cstate->opts.file_encoding < 0)
    1712          975 :         cstate->file_encoding = pg_get_client_encoding();
    1713              :     else
    1714           15 :         cstate->file_encoding = cstate->opts.file_encoding;
    1715              : 
    1716              :     /*
    1717              :      * Look up encoding conversion function.
    1718              :      */
    1719          990 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1720           39 :         cstate->file_encoding == PG_SQL_ASCII ||
    1721           18 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1722              :     {
    1723          972 :         cstate->need_transcoding = false;
    1724              :     }
    1725              :     else
    1726              :     {
    1727           18 :         cstate->need_transcoding = true;
    1728           18 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1729              :                                                             GetDatabaseEncoding());
    1730           18 :         if (!OidIsValid(cstate->conversion_proc))
    1731            0 :             ereport(ERROR,
    1732              :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
    1733              :                      errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
    1734              :                             pg_encoding_to_char(cstate->file_encoding),
    1735              :                             pg_encoding_to_char(GetDatabaseEncoding()))));
    1736              :     }
    1737              : 
    1738          990 :     cstate->copy_src = COPY_FILE;    /* default */
    1739              : 
    1740          990 :     cstate->whereClause = whereClause;
    1741              : 
    1742              :     /* Initialize state variables */
    1743          990 :     cstate->eol_type = EOL_UNKNOWN;
    1744          990 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1745          990 :     cstate->cur_lineno = 0;
    1746          990 :     cstate->cur_attname = NULL;
    1747          990 :     cstate->cur_attval = NULL;
    1748          990 :     cstate->relname_only = false;
    1749              : 
    1750              :     /*
    1751              :      * Allocate buffers for the input pipeline.
    1752              :      *
    1753              :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1754              :      * input_buf and line_buf only in text mode.
    1755              :      */
    1756          990 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1757          990 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1758          990 :     cstate->raw_reached_eof = false;
    1759              : 
    1760          990 :     initStringInfo(&cstate->attribute_buf);
    1761              : 
    1762              :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1763          990 :     if (pstate)
    1764              :     {
    1765          953 :         cstate->range_table = pstate->p_rtable;
    1766          953 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1767              :     }
    1768              : 
    1769          990 :     num_defaults = 0;
    1770          990 :     volatile_defexprs = false;
    1771              : 
    1772              :     /*
    1773              :      * Pick up the required catalog information for each attribute in the
    1774              :      * relation, including the input function, the element type (to pass to
    1775              :      * the input function), and info about defaults and constraints. (Which
    1776              :      * input function we use depends on text/binary format choice.)
    1777              :      */
    1778          990 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1779          990 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1780          990 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1781          990 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1782              : 
    1783         3904 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1784              :     {
    1785         2921 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1786              : 
    1787              :         /* We don't need info for dropped attributes */
    1788         2921 :         if (att->attisdropped)
    1789           62 :             continue;
    1790              : 
    1791              :         /* Fetch the input function and typioparam info */
    1792         2859 :         cstate->routine->CopyFromInFunc(cstate, att->atttypid,
    1793         2859 :                                         &in_functions[attnum - 1],
    1794         2859 :                                         &typioparams[attnum - 1]);
    1795              : 
    1796              :         /* Get default info if available */
    1797         2858 :         defexprs[attnum - 1] = NULL;
    1798              : 
    1799              :         /*
    1800              :          * We only need the default values for columns that do not appear in
    1801              :          * the column list, unless the DEFAULT option was given. We never need
    1802              :          * default values for generated columns.
    1803              :          */
    1804         2858 :         if ((cstate->opts.default_print != NULL ||
    1805         2795 :              !list_member_int(cstate->attnumlist, attnum)) &&
    1806          341 :             !att->attgenerated)
    1807              :         {
    1808          323 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1809              :                                                                 attnum);
    1810              : 
    1811          323 :             if (defexpr != NULL)
    1812              :             {
    1813              :                 /* Run the expression through planner */
    1814          132 :                 defexpr = expression_planner(defexpr);
    1815              : 
    1816              :                 /* Initialize executable expression in copycontext */
    1817          126 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1818              : 
    1819              :                 /* if NOT copied from input */
    1820              :                 /* use default value if one exists */
    1821          126 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1822              :                 {
    1823           86 :                     defmap[num_defaults] = attnum - 1;
    1824           86 :                     num_defaults++;
    1825              :                 }
    1826              : 
    1827              :                 /*
    1828              :                  * If a default expression looks at the table being loaded,
    1829              :                  * then it could give the wrong answer when using
    1830              :                  * multi-insert. Since database access can be dynamic this is
    1831              :                  * hard to test for exactly, so we use the much wider test of
    1832              :                  * whether the default expression is volatile. We allow for
    1833              :                  * the special case of when the default expression is the
    1834              :                  * nextval() of a sequence which in this specific case is
    1835              :                  * known to be safe for use with the multi-insert
    1836              :                  * optimization. Hence we use this special case function
    1837              :                  * checker rather than the standard check for
    1838              :                  * contain_volatile_functions().  Note also that we already
    1839              :                  * ran the expression through expression_planner().
    1840              :                  */
    1841          126 :                 if (!volatile_defexprs)
    1842          126 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1843              :             }
    1844              :         }
    1845              :     }
    1846              : 
    1847          983 :     cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
    1848              : 
    1849              :     /* initialize progress */
    1850          983 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1851          983 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1852          983 :     cstate->bytes_processed = 0;
    1853              : 
    1854              :     /* We keep those variables in cstate. */
    1855          983 :     cstate->in_functions = in_functions;
    1856          983 :     cstate->typioparams = typioparams;
    1857          983 :     cstate->defmap = defmap;
    1858          983 :     cstate->defexprs = defexprs;
    1859          983 :     cstate->volatile_defexprs = volatile_defexprs;
    1860          983 :     cstate->num_defaults = num_defaults;
    1861          983 :     cstate->is_program = is_program;
    1862              : 
    1863          983 :     if (data_source_cb)
    1864              :     {
    1865          208 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1866          208 :         cstate->copy_src = COPY_CALLBACK;
    1867          208 :         cstate->data_source_cb = data_source_cb;
    1868              :     }
    1869          775 :     else if (pipe)
    1870              :     {
    1871          541 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1872              :         Assert(!is_program);    /* the grammar does not allow this */
    1873          541 :         if (whereToSendOutput == DestRemote)
    1874          541 :             ReceiveCopyBegin(cstate);
    1875              :         else
    1876            0 :             cstate->copy_file = stdin;
    1877              :     }
    1878              :     else
    1879              :     {
    1880          234 :         cstate->filename = pstrdup(filename);
    1881              : 
    1882          234 :         if (cstate->is_program)
    1883              :         {
    1884            0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1885            0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1886            0 :             if (cstate->copy_file == NULL)
    1887            0 :                 ereport(ERROR,
    1888              :                         (errcode_for_file_access(),
    1889              :                          errmsg("could not execute command \"%s\": %m",
    1890              :                                 cstate->filename)));
    1891              :         }
    1892              :         else
    1893              :         {
    1894              :             struct stat st;
    1895              : 
    1896          234 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1897          234 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1898          234 :             if (cstate->copy_file == NULL)
    1899              :             {
    1900              :                 /* copy errno because ereport subfunctions might change it */
    1901            0 :                 int         save_errno = errno;
    1902              : 
    1903            0 :                 ereport(ERROR,
    1904              :                         (errcode_for_file_access(),
    1905              :                          errmsg("could not open file \"%s\" for reading: %m",
    1906              :                                 cstate->filename),
    1907              :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1908              :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1909              :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1910              :             }
    1911              : 
    1912          234 :             if (fstat(fileno(cstate->copy_file), &st))
    1913            0 :                 ereport(ERROR,
    1914              :                         (errcode_for_file_access(),
    1915              :                          errmsg("could not stat file \"%s\": %m",
    1916              :                                 cstate->filename)));
    1917              : 
    1918          234 :             if (S_ISDIR(st.st_mode))
    1919            0 :                 ereport(ERROR,
    1920              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1921              :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1922              : 
    1923          234 :             progress_vals[2] = st.st_size;
    1924              :         }
    1925              :     }
    1926              : 
    1927          983 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1928              : 
    1929          983 :     cstate->routine->CopyFromStart(cstate, tupDesc);
    1930              : 
    1931          983 :     MemoryContextSwitchTo(oldcontext);
    1932              : 
    1933          983 :     return cstate;
    1934              : }
    1935              : 
    1936              : /*
    1937              :  * Clean up storage and release resources for COPY FROM.
    1938              :  */
    1939              : void
    1940          638 : EndCopyFrom(CopyFromState cstate)
    1941              : {
    1942              :     /* Invoke the end callback */
    1943          638 :     cstate->routine->CopyFromEnd(cstate);
    1944              : 
    1945              :     /* No COPY FROM related resources except memory. */
    1946          638 :     if (cstate->is_program)
    1947              :     {
    1948            0 :         ClosePipeFromProgram(cstate);
    1949              :     }
    1950              :     else
    1951              :     {
    1952          638 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1953            0 :             ereport(ERROR,
    1954              :                     (errcode_for_file_access(),
    1955              :                      errmsg("could not close file \"%s\": %m",
    1956              :                             cstate->filename)));
    1957              :     }
    1958              : 
    1959          638 :     pgstat_progress_end_command();
    1960              : 
    1961          638 :     MemoryContextDelete(cstate->copycontext);
    1962          638 :     pfree(cstate);
    1963          638 : }
    1964              : 
    1965              : /*
    1966              :  * Closes the pipe from an external program, checking the pclose() return code.
    1967              :  */
    1968              : static void
    1969            0 : ClosePipeFromProgram(CopyFromState cstate)
    1970              : {
    1971              :     int         pclose_rc;
    1972              : 
    1973              :     Assert(cstate->is_program);
    1974              : 
    1975            0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1976            0 :     if (pclose_rc == -1)
    1977            0 :         ereport(ERROR,
    1978              :                 (errcode_for_file_access(),
    1979              :                  errmsg("could not close pipe to external command: %m")));
    1980            0 :     else if (pclose_rc != 0)
    1981              :     {
    1982              :         /*
    1983              :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1984              :          * expectable for the called program to fail with SIGPIPE, and we
    1985              :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1986              :          * problem.
    1987              :          */
    1988            0 :         if (!cstate->raw_reached_eof &&
    1989            0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1990            0 :             return;
    1991              : 
    1992            0 :         ereport(ERROR,
    1993              :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1994              :                  errmsg("program \"%s\" failed",
    1995              :                         cstate->filename),
    1996              :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1997              :     }
    1998              : }
        

Generated by: LCOV version 2.0-1