LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.4 % 581 531
Test Date: 2026-02-17 17:20:33 Functions: 95.8 % 24 23
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * copyfrom.c
       4              :  *      COPY <table> FROM file/program/client
       5              :  *
       6              :  * This file contains routines needed to efficiently load tuples into a
       7              :  * table.  That includes looking up the correct partition, firing triggers,
       8              :  * calling the table AM function to insert the data, and updating indexes.
       9              :  * Reading data from the input file or client and parsing it into Datums
      10              :  * is handled in copyfromparse.c.
      11              :  *
      12              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      13              :  * Portions Copyright (c) 1994, Regents of the University of California
      14              :  *
      15              :  *
      16              :  * IDENTIFICATION
      17              :  *    src/backend/commands/copyfrom.c
      18              :  *
      19              :  *-------------------------------------------------------------------------
      20              :  */
      21              : #include "postgres.h"
      22              : 
      23              : #include <ctype.h>
      24              : #include <unistd.h>
      25              : #include <sys/stat.h>
      26              : 
      27              : #include "access/heapam.h"
      28              : #include "access/tableam.h"
      29              : #include "access/xact.h"
      30              : #include "catalog/namespace.h"
      31              : #include "commands/copyapi.h"
      32              : #include "commands/copyfrom_internal.h"
      33              : #include "commands/progress.h"
      34              : #include "commands/trigger.h"
      35              : #include "executor/execPartition.h"
      36              : #include "executor/executor.h"
      37              : #include "executor/nodeModifyTable.h"
      38              : #include "executor/tuptable.h"
      39              : #include "foreign/fdwapi.h"
      40              : #include "mb/pg_wchar.h"
      41              : #include "miscadmin.h"
      42              : #include "nodes/miscnodes.h"
      43              : #include "optimizer/optimizer.h"
      44              : #include "pgstat.h"
      45              : #include "rewrite/rewriteHandler.h"
      46              : #include "storage/fd.h"
      47              : #include "tcop/tcopprot.h"
      48              : #include "utils/lsyscache.h"
      49              : #include "utils/memutils.h"
      50              : #include "utils/portal.h"
      51              : #include "utils/rel.h"
      52              : #include "utils/snapmgr.h"
      53              : 
      54              : /*
      55              :  * No more than this many tuples per CopyMultiInsertBuffer
      56              :  *
      57              :  * Caution: Don't make this too big, as we could end up with this many
      58              :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      59              :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      60              :  * memory requirements during copies into partitioned tables with a large
      61              :  * number of partitions.
      62              :  */
      63              : #define MAX_BUFFERED_TUPLES     1000
      64              : 
      65              : /*
      66              :  * Flush buffers if there are >= this many bytes, as counted by the input
      67              :  * size, of tuples stored.
      68              :  */
      69              : #define MAX_BUFFERED_BYTES      65535
      70              : 
      71              : /*
      72              :  * Trim the list of buffers back down to this number after flushing.  This
      73              :  * must be >= 2.
      74              :  */
      75              : #define MAX_PARTITION_BUFFERS   32
      76              : 
      77              : /* Stores multi-insert data related to a single relation in CopyFrom. */
      78              : typedef struct CopyMultiInsertBuffer
      79              : {
      80              :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      81              :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      82              :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      83              :                                  * table; NULL if foreign table */
      84              :     int         nused;          /* number of 'slots' containing tuples */
      85              :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      86              :                                                  * stream */
      87              : } CopyMultiInsertBuffer;
      88              : 
      89              : /*
      90              :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      91              :  * number of tuples which are stored in them.  This allows multiple buffers to
      92              :  * exist at once when COPYing into a partitioned table.
      93              :  */
      94              : typedef struct CopyMultiInsertInfo
      95              : {
      96              :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      97              :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      98              :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
      99              :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
     100              :     EState     *estate;         /* Executor state used for COPY */
     101              :     CommandId   mycid;          /* Command Id used for COPY */
     102              :     int         ti_options;     /* table insert options */
     103              : } CopyMultiInsertInfo;
     104              : 
     105              : 
     106              : /* non-export function prototypes */
     107              : static void ClosePipeFromProgram(CopyFromState cstate);
     108              : 
     109              : /*
     110              :  * Built-in format-specific routines. One-row callbacks are defined in
     111              :  * copyfromparse.c.
     112              :  */
     113              : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     114              :                                    Oid *typioparam);
     115              : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
     116              : static void CopyFromTextLikeEnd(CopyFromState cstate);
     117              : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     118              :                                  FmgrInfo *finfo, Oid *typioparam);
     119              : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
     120              : static void CopyFromBinaryEnd(CopyFromState cstate);
     121              : 
     122              : 
     123              : /*
     124              :  * COPY FROM routines for built-in formats.
     125              :  *
     126              :  * CSV and text formats share the same TextLike routines except for the
     127              :  * one-row callback.
     128              :  */
     129              : 
     130              : /* text format */
     131              : static const CopyFromRoutine CopyFromRoutineText = {
     132              :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     133              :     .CopyFromStart = CopyFromTextLikeStart,
     134              :     .CopyFromOneRow = CopyFromTextOneRow,
     135              :     .CopyFromEnd = CopyFromTextLikeEnd,
     136              : };
     137              : 
     138              : /* CSV format */
     139              : static const CopyFromRoutine CopyFromRoutineCSV = {
     140              :     .CopyFromInFunc = CopyFromTextLikeInFunc,
     141              :     .CopyFromStart = CopyFromTextLikeStart,
     142              :     .CopyFromOneRow = CopyFromCSVOneRow,
     143              :     .CopyFromEnd = CopyFromTextLikeEnd,
     144              : };
     145              : 
     146              : /* binary format */
     147              : static const CopyFromRoutine CopyFromRoutineBinary = {
     148              :     .CopyFromInFunc = CopyFromBinaryInFunc,
     149              :     .CopyFromStart = CopyFromBinaryStart,
     150              :     .CopyFromOneRow = CopyFromBinaryOneRow,
     151              :     .CopyFromEnd = CopyFromBinaryEnd,
     152              : };
     153              : 
     154              : /* Return a COPY FROM routine for the given options */
     155              : static const CopyFromRoutine *
     156          964 : CopyFromGetRoutine(const CopyFormatOptions *opts)
     157              : {
     158          964 :     if (opts->csv_mode)
     159          149 :         return &CopyFromRoutineCSV;
     160          815 :     else if (opts->binary)
     161            8 :         return &CopyFromRoutineBinary;
     162              : 
     163              :     /* default is text */
     164          807 :     return &CopyFromRoutineText;
     165              : }
     166              : 
     167              : /* Implementation of the start callback for text and CSV formats */
     168              : static void
     169          944 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
     170              : {
     171              :     AttrNumber  attr_count;
     172              : 
     173              :     /*
     174              :      * If encoding conversion is needed, we need another buffer to hold the
     175              :      * converted input data.  Otherwise, we can just point input_buf to the
     176              :      * same buffer as raw_buf.
     177              :      */
     178          944 :     if (cstate->need_transcoding)
     179              :     {
     180           18 :         cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
     181           18 :         cstate->input_buf_index = cstate->input_buf_len = 0;
     182              :     }
     183              :     else
     184          926 :         cstate->input_buf = cstate->raw_buf;
     185          944 :     cstate->input_reached_eof = false;
     186              : 
     187          944 :     initStringInfo(&cstate->line_buf);
     188              : 
     189              :     /*
     190              :      * Create workspace for CopyReadAttributes results; used by CSV and text
     191              :      * format.
     192              :      */
     193          944 :     attr_count = list_length(cstate->attnumlist);
     194          944 :     cstate->max_fields = attr_count;
     195          944 :     cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
     196          944 : }
     197              : 
     198              : /*
     199              :  * Implementation of the infunc callback for text and CSV formats. Assign
     200              :  * the input function data to the given *finfo.
     201              :  */
     202              : static void
     203         2757 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
     204              :                        Oid *typioparam)
     205              : {
     206              :     Oid         func_oid;
     207              : 
     208         2757 :     getTypeInputInfo(atttypid, &func_oid, typioparam);
     209         2757 :     fmgr_info(func_oid, finfo);
     210         2757 : }
     211              : 
     212              : /* Implementation of the end callback for text and CSV formats */
     213              : static void
     214          631 : CopyFromTextLikeEnd(CopyFromState cstate)
     215              : {
     216              :     /* nothing to do */
     217          631 : }
     218              : 
     219              : /* Implementation of the start callback for binary format */
     220              : static void
     221            7 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
     222              : {
     223              :     /* Read and verify binary header */
     224            7 :     ReceiveCopyBinaryHeader(cstate);
     225            7 : }
     226              : 
     227              : /*
     228              :  * Implementation of the infunc callback for binary format. Assign
     229              :  * the binary input function to the given *finfo.
     230              :  */
     231              : static void
     232           31 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
     233              :                      FmgrInfo *finfo, Oid *typioparam)
     234              : {
     235              :     Oid         func_oid;
     236              : 
     237           31 :     getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
     238           30 :     fmgr_info(func_oid, finfo);
     239           30 : }
     240              : 
     241              : /* Implementation of the end callback for binary format */
     242              : static void
     243            3 : CopyFromBinaryEnd(CopyFromState cstate)
     244              : {
     245              :     /* nothing to do */
     246            3 : }
     247              : 
     248              : /*
     249              :  * error context callback for COPY FROM
     250              :  *
     251              :  * The argument for the error context must be CopyFromState.
     252              :  */
     253              : void
     254          162 : CopyFromErrorCallback(void *arg)
     255              : {
     256          162 :     CopyFromState cstate = (CopyFromState) arg;
     257              : 
     258          162 :     if (cstate->relname_only)
     259              :     {
     260           22 :         errcontext("COPY %s",
     261              :                    cstate->cur_relname);
     262           22 :         return;
     263              :     }
     264          140 :     if (cstate->opts.binary)
     265              :     {
     266              :         /* can't usefully display the data */
     267            1 :         if (cstate->cur_attname)
     268            1 :             errcontext("COPY %s, line %" PRIu64 ", column %s",
     269              :                        cstate->cur_relname,
     270              :                        cstate->cur_lineno,
     271              :                        cstate->cur_attname);
     272              :         else
     273            0 :             errcontext("COPY %s, line %" PRIu64,
     274              :                        cstate->cur_relname,
     275              :                        cstate->cur_lineno);
     276              :     }
     277              :     else
     278              :     {
     279          139 :         if (cstate->cur_attname && cstate->cur_attval)
     280           20 :         {
     281              :             /* error is relevant to a particular column */
     282              :             char       *attval;
     283              : 
     284           20 :             attval = CopyLimitPrintoutLength(cstate->cur_attval);
     285           20 :             errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
     286              :                        cstate->cur_relname,
     287              :                        cstate->cur_lineno,
     288              :                        cstate->cur_attname,
     289              :                        attval);
     290           20 :             pfree(attval);
     291              :         }
     292          119 :         else if (cstate->cur_attname)
     293              :         {
     294              :             /* error is relevant to a particular column, value is NULL */
     295            3 :             errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
     296              :                        cstate->cur_relname,
     297              :                        cstate->cur_lineno,
     298              :                        cstate->cur_attname);
     299              :         }
     300              :         else
     301              :         {
     302              :             /*
     303              :              * Error is relevant to a particular line.
     304              :              *
     305              :              * If line_buf still contains the correct line, print it.
     306              :              */
     307          116 :             if (cstate->line_buf_valid)
     308              :             {
     309              :                 char       *lineval;
     310              : 
     311           94 :                 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
     312           94 :                 errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
     313              :                            cstate->cur_relname,
     314              :                            cstate->cur_lineno, lineval);
     315           94 :                 pfree(lineval);
     316              :             }
     317              :             else
     318              :             {
     319           22 :                 errcontext("COPY %s, line %" PRIu64,
     320              :                            cstate->cur_relname,
     321              :                            cstate->cur_lineno);
     322              :             }
     323              :         }
     324              :     }
     325              : }
     326              : 
     327              : /*
     328              :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     329              :  *
     330              :  * Returns a pstrdup'd copy of the input.
     331              :  */
     332              : char *
     333          132 : CopyLimitPrintoutLength(const char *str)
     334              : {
     335              : #define MAX_COPY_DATA_DISPLAY 100
     336              : 
     337          132 :     int         slen = strlen(str);
     338              :     int         len;
     339              :     char       *res;
     340              : 
     341              :     /* Fast path if definitely okay */
     342          132 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     343          132 :         return pstrdup(str);
     344              : 
     345              :     /* Apply encoding-dependent truncation */
     346            0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     347              : 
     348              :     /*
     349              :      * Truncate, and add "..." to show we truncated the input.
     350              :      */
     351            0 :     res = (char *) palloc(len + 4);
     352            0 :     memcpy(res, str, len);
     353            0 :     strcpy(res + len, "...");
     354              : 
     355            0 :     return res;
     356              : }
     357              : 
     358              : /*
     359              :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     360              :  * ResultRelInfo.
     361              :  */
     362              : static CopyMultiInsertBuffer *
     363          823 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     364              : {
     365              :     CopyMultiInsertBuffer *buffer;
     366              : 
     367          823 :     buffer = palloc_object(CopyMultiInsertBuffer);
     368          823 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     369          823 :     buffer->resultRelInfo = rri;
     370          823 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     371          823 :     buffer->nused = 0;
     372              : 
     373          823 :     return buffer;
     374              : }
     375              : 
     376              : /*
     377              :  * Make a new buffer for this ResultRelInfo.
     378              :  */
     379              : static inline void
     380          823 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     381              :                                ResultRelInfo *rri)
     382              : {
     383              :     CopyMultiInsertBuffer *buffer;
     384              : 
     385          823 :     buffer = CopyMultiInsertBufferInit(rri);
     386              : 
     387              :     /* Setup back-link so we can easily find this buffer again */
     388          823 :     rri->ri_CopyMultiInsertBuffer = buffer;
     389              :     /* Record that we're tracking this buffer */
     390          823 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     391          823 : }
     392              : 
     393              : /*
     394              :  * Initialize an already allocated CopyMultiInsertInfo.
     395              :  *
     396              :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     397              :  * for that table.
     398              :  */
     399              : static void
     400          820 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     401              :                         CopyFromState cstate, EState *estate, CommandId mycid,
     402              :                         int ti_options)
     403              : {
     404          820 :     miinfo->multiInsertBuffers = NIL;
     405          820 :     miinfo->bufferedTuples = 0;
     406          820 :     miinfo->bufferedBytes = 0;
     407          820 :     miinfo->cstate = cstate;
     408          820 :     miinfo->estate = estate;
     409          820 :     miinfo->mycid = mycid;
     410          820 :     miinfo->ti_options = ti_options;
     411              : 
     412              :     /*
     413              :      * Only setup the buffer when not dealing with a partitioned table.
     414              :      * Buffers for partitioned tables will just be setup when we need to send
     415              :      * tuples their way for the first time.
     416              :      */
     417          820 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     418          780 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     419          820 : }
     420              : 
     421              : /*
     422              :  * Returns true if the buffers are full
     423              :  */
     424              : static inline bool
     425       602446 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     426              : {
     427       602446 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     428       601949 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     429          550 :         return true;
     430       601896 :     return false;
     431              : }
     432              : 
     433              : /*
     434              :  * Returns true if we have no buffered tuples
     435              :  */
     436              : static inline bool
     437          749 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     438              : {
     439          749 :     return miinfo->bufferedTuples == 0;
     440              : }
     441              : 
     442              : /*
     443              :  * Write the tuples stored in 'buffer' out to the table.
     444              :  */
     445              : static inline void
     446         1241 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     447              :                            CopyMultiInsertBuffer *buffer,
     448              :                            int64 *processed)
     449              : {
     450         1241 :     CopyFromState cstate = miinfo->cstate;
     451         1241 :     EState     *estate = miinfo->estate;
     452         1241 :     int         nused = buffer->nused;
     453         1241 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     454         1241 :     TupleTableSlot **slots = buffer->slots;
     455              :     int         i;
     456              : 
     457         1241 :     if (resultRelInfo->ri_FdwRoutine)
     458              :     {
     459            7 :         int         batch_size = resultRelInfo->ri_BatchSize;
     460            7 :         int         sent = 0;
     461              : 
     462              :         Assert(buffer->bistate == NULL);
     463              : 
     464              :         /* Ensure that the FDW supports batching and it's enabled */
     465              :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     466              :         Assert(batch_size > 1);
     467              : 
     468              :         /*
     469              :          * We suppress error context information other than the relation name,
     470              :          * if one of the operations below fails.
     471              :          */
     472              :         Assert(!cstate->relname_only);
     473            7 :         cstate->relname_only = true;
     474              : 
     475           19 :         while (sent < nused)
     476              :         {
     477           13 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     478           13 :             int         inserted = size;
     479              :             TupleTableSlot **rslots;
     480              : 
     481              :             /* insert into foreign table: let the FDW do it */
     482              :             rslots =
     483           13 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     484              :                                                                      resultRelInfo,
     485           13 :                                                                      &slots[sent],
     486              :                                                                      NULL,
     487              :                                                                      &inserted);
     488              : 
     489           12 :             sent += size;
     490              : 
     491              :             /* No need to do anything if there are no inserted rows */
     492           12 :             if (inserted <= 0)
     493            2 :                 continue;
     494              : 
     495              :             /* Triggers on foreign tables should not have transition tables */
     496              :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     497              :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     498              : 
     499              :             /* Run AFTER ROW INSERT triggers */
     500           10 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     501            0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     502              :             {
     503            0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     504              : 
     505            0 :                 for (i = 0; i < inserted; i++)
     506              :                 {
     507            0 :                     TupleTableSlot *slot = rslots[i];
     508              : 
     509              :                     /*
     510              :                      * AFTER ROW Triggers might reference the tableoid column,
     511              :                      * so (re-)initialize tts_tableOid before evaluating them.
     512              :                      */
     513            0 :                     slot->tts_tableOid = relid;
     514              : 
     515            0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     516              :                                          slot, NIL,
     517              :                                          cstate->transition_capture);
     518              :                 }
     519              :             }
     520              : 
     521              :             /* Update the row counter and progress of the COPY command */
     522           10 :             *processed += inserted;
     523           10 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     524              :                                          *processed);
     525              :         }
     526              : 
     527           24 :         for (i = 0; i < nused; i++)
     528           18 :             ExecClearTuple(slots[i]);
     529              : 
     530              :         /* reset relname_only */
     531            6 :         cstate->relname_only = false;
     532              :     }
     533              :     else
     534              :     {
     535         1234 :         CommandId   mycid = miinfo->mycid;
     536         1234 :         int         ti_options = miinfo->ti_options;
     537         1234 :         bool        line_buf_valid = cstate->line_buf_valid;
     538         1234 :         uint64      save_cur_lineno = cstate->cur_lineno;
     539              :         MemoryContext oldcontext;
     540              : 
     541              :         Assert(buffer->bistate != NULL);
     542              : 
     543              :         /*
     544              :          * Print error context information correctly, if one of the operations
     545              :          * below fails.
     546              :          */
     547         1234 :         cstate->line_buf_valid = false;
     548              : 
     549              :         /*
     550              :          * table_multi_insert may leak memory, so switch to short-lived memory
     551              :          * context before calling it.
     552              :          */
     553         1234 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     554         1234 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     555              :                            slots,
     556              :                            nused,
     557              :                            mycid,
     558              :                            ti_options,
     559         1234 :                            buffer->bistate);
     560         1234 :         MemoryContextSwitchTo(oldcontext);
     561              : 
     562       603587 :         for (i = 0; i < nused; i++)
     563              :         {
     564              :             /*
     565              :              * If there are any indexes, update them for all the inserted
     566              :              * tuples, and run AFTER ROW INSERT triggers.
     567              :              */
     568       602361 :             if (resultRelInfo->ri_NumIndices > 0)
     569              :             {
     570              :                 List       *recheckIndexes;
     571              : 
     572       100954 :                 cstate->cur_lineno = buffer->linenos[i];
     573              :                 recheckIndexes =
     574       100954 :                     ExecInsertIndexTuples(resultRelInfo,
     575              :                                           buffer->slots[i], estate, false,
     576              :                                           false, NULL, NIL, false);
     577       100946 :                 ExecARInsertTriggers(estate, resultRelInfo,
     578       100946 :                                      slots[i], recheckIndexes,
     579              :                                      cstate->transition_capture);
     580       100946 :                 list_free(recheckIndexes);
     581              :             }
     582              : 
     583              :             /*
     584              :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     585              :              * triggers anyway.
     586              :              */
     587       501407 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     588           42 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     589           33 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     590              :             {
     591           18 :                 cstate->cur_lineno = buffer->linenos[i];
     592           18 :                 ExecARInsertTriggers(estate, resultRelInfo,
     593           18 :                                      slots[i], NIL,
     594              :                                      cstate->transition_capture);
     595              :             }
     596              : 
     597       602353 :             ExecClearTuple(slots[i]);
     598              :         }
     599              : 
     600              :         /* Update the row counter and progress of the COPY command */
     601         1226 :         *processed += nused;
     602         1226 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     603              :                                      *processed);
     604              : 
     605              :         /* reset cur_lineno and line_buf_valid to what they were */
     606         1226 :         cstate->line_buf_valid = line_buf_valid;
     607         1226 :         cstate->cur_lineno = save_cur_lineno;
     608              :     }
     609              : 
     610              :     /* Mark that all slots are free */
     611         1232 :     buffer->nused = 0;
     612         1232 : }
     613              : 
     614              : /*
     615              :  * Drop used slots and free member for this buffer.
     616              :  *
     617              :  * The buffer must be flushed before cleanup.
     618              :  */
     619              : static inline void
     620          729 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     621              :                              CopyMultiInsertBuffer *buffer)
     622              : {
     623          729 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     624              :     int         i;
     625              : 
     626              :     /* Ensure buffer was flushed */
     627              :     Assert(buffer->nused == 0);
     628              : 
     629              :     /* Remove back-link to ourself */
     630          729 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     631              : 
     632          729 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     633              :     {
     634              :         Assert(buffer->bistate != NULL);
     635          723 :         FreeBulkInsertState(buffer->bistate);
     636              :     }
     637              :     else
     638              :         Assert(buffer->bistate == NULL);
     639              : 
     640              :     /* Since we only create slots on demand, just drop the non-null ones. */
     641       122645 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     642       121916 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     643              : 
     644          729 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     645          723 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     646              :                                  miinfo->ti_options);
     647              : 
     648          729 :     pfree(buffer);
     649          729 : }
     650              : 
     651              : /*
     652              :  * Write out all stored tuples in all buffers out to the tables.
     653              :  *
     654              :  * Once flushed we also trim the tracked buffers list down to size by removing
     655              :  * the buffers created earliest first.
     656              :  *
     657              :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     658              :  * used.  When cleaning up old buffers we'll never remove the one for
     659              :  * 'curr_rri'.
     660              :  */
     661              : static inline void
     662         1128 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     663              :                          int64 *processed)
     664              : {
     665              :     ListCell   *lc;
     666              : 
     667         2360 :     foreach(lc, miinfo->multiInsertBuffers)
     668              :     {
     669         1241 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     670              : 
     671         1241 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     672              :     }
     673              : 
     674         1119 :     miinfo->bufferedTuples = 0;
     675         1119 :     miinfo->bufferedBytes = 0;
     676              : 
     677              :     /*
     678              :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     679              :      * remove buffers starting with the ones we created first.  It seems less
     680              :      * likely that these older ones will be needed than the ones that were
     681              :      * just created.
     682              :      */
     683         1119 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     684              :     {
     685              :         CopyMultiInsertBuffer *buffer;
     686              : 
     687            0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     688              : 
     689              :         /*
     690              :          * We never want to remove the buffer that's currently being used, so
     691              :          * if we happen to find that then move it to the end of the list.
     692              :          */
     693            0 :         if (buffer->resultRelInfo == curr_rri)
     694              :         {
     695              :             /*
     696              :              * The code below would misbehave if we were trying to reduce the
     697              :              * list to less than two items.
     698              :              */
     699              :             StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
     700              :                              "MAX_PARTITION_BUFFERS must be >= 2");
     701              : 
     702            0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     703            0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     704            0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     705              :         }
     706              : 
     707            0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     708            0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     709              :     }
     710         1119 : }
     711              : 
     712              : /*
     713              :  * Cleanup allocated buffers and free memory
     714              :  */
     715              : static inline void
     716          726 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     717              : {
     718              :     ListCell   *lc;
     719              : 
     720         1455 :     foreach(lc, miinfo->multiInsertBuffers)
     721          729 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     722              : 
     723          726 :     list_free(miinfo->multiInsertBuffers);
     724          726 : }
     725              : 
     726              : /*
     727              :  * Get the next TupleTableSlot that the next tuple should be stored in.
     728              :  *
     729              :  * Callers must ensure that the buffer is not full.
     730              :  *
     731              :  * Note: 'miinfo' is unused but has been included for consistency with the
     732              :  * other functions in this area.
     733              :  */
     734              : static inline TupleTableSlot *
     735       603280 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     736              :                                 ResultRelInfo *rri)
     737              : {
     738       603280 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     739              :     int         nused;
     740              : 
     741              :     Assert(buffer != NULL);
     742              :     Assert(buffer->nused < MAX_BUFFERED_TUPLES);
     743              : 
     744       603280 :     nused = buffer->nused;
     745              : 
     746       603280 :     if (buffer->slots[nused] == NULL)
     747       122084 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     748       603280 :     return buffer->slots[nused];
     749              : }
     750              : 
     751              : /*
     752              :  * Record the previously reserved TupleTableSlot that was reserved by
     753              :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     754              :  */
     755              : static inline void
     756       602446 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     757              :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     758              : {
     759       602446 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     760              : 
     761              :     Assert(buffer != NULL);
     762              :     Assert(slot == buffer->slots[buffer->nused]);
     763              : 
     764              :     /* Store the line number so we can properly report any errors later */
     765       602446 :     buffer->linenos[buffer->nused] = lineno;
     766              : 
     767              :     /* Record this slot as being used */
     768       602446 :     buffer->nused++;
     769              : 
     770              :     /* Update how many tuples are stored and their size */
     771       602446 :     miinfo->bufferedTuples++;
     772       602446 :     miinfo->bufferedBytes += tuplen;
     773       602446 : }
     774              : 
     775              : /*
     776              :  * Copy FROM file to relation.
     777              :  */
     778              : uint64
     779          914 : CopyFrom(CopyFromState cstate)
     780              : {
     781              :     ResultRelInfo *resultRelInfo;
     782              :     ResultRelInfo *target_resultRelInfo;
     783          914 :     ResultRelInfo *prevResultRelInfo = NULL;
     784          914 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     785              :     ModifyTableState *mtstate;
     786              :     ExprContext *econtext;
     787          914 :     TupleTableSlot *singleslot = NULL;
     788          914 :     MemoryContext oldcontext = CurrentMemoryContext;
     789              : 
     790          914 :     PartitionTupleRouting *proute = NULL;
     791              :     ErrorContextCallback errcallback;
     792          914 :     CommandId   mycid = GetCurrentCommandId(true);
     793          914 :     int         ti_options = 0; /* start with default options for insert */
     794          914 :     BulkInsertState bistate = NULL;
     795              :     CopyInsertMethod insertMethod;
     796          914 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     797          914 :     int64       processed = 0;
     798          914 :     int64       excluded = 0;
     799              :     bool        has_before_insert_row_trig;
     800              :     bool        has_instead_insert_row_trig;
     801          914 :     bool        leafpart_use_multi_insert = false;
     802              : 
     803              :     Assert(cstate->rel);
     804              :     Assert(list_length(cstate->range_table) == 1);
     805              : 
     806          914 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
     807              :         Assert(cstate->escontext);
     808              : 
     809              :     /*
     810              :      * The target must be a plain, foreign, or partitioned relation, or have
     811              :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     812              :      * allowed on views, so we only hint about them in the view case.)
     813              :      */
     814          914 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     815           88 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     816           66 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     817            9 :         !(cstate->rel->trigdesc &&
     818            6 :           cstate->rel->trigdesc->trig_insert_instead_row))
     819              :     {
     820            3 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     821            3 :             ereport(ERROR,
     822              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     823              :                      errmsg("cannot copy to view \"%s\"",
     824              :                             RelationGetRelationName(cstate->rel)),
     825              :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     826            0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     827            0 :             ereport(ERROR,
     828              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     829              :                      errmsg("cannot copy to materialized view \"%s\"",
     830              :                             RelationGetRelationName(cstate->rel))));
     831            0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     832            0 :             ereport(ERROR,
     833              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     834              :                      errmsg("cannot copy to sequence \"%s\"",
     835              :                             RelationGetRelationName(cstate->rel))));
     836              :         else
     837            0 :             ereport(ERROR,
     838              :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     839              :                      errmsg("cannot copy to non-table relation \"%s\"",
     840              :                             RelationGetRelationName(cstate->rel))));
     841              :     }
     842              : 
     843              :     /*
     844              :      * If the target file is new-in-transaction, we assume that checking FSM
     845              :      * for free space is a waste of time.  This could possibly be wrong, but
     846              :      * it's unlikely.
     847              :      */
     848          911 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     849          826 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     850          820 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     851           44 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     852              : 
     853              :     /*
     854              :      * Optimize if new relation storage was created in this subxact or one of
     855              :      * its committed children and we won't see those rows later as part of an
     856              :      * earlier scan or command. The subxact test ensures that if this subxact
     857              :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     858              :      * that the stronger test of exactly which subtransaction created it is
     859              :      * crucial for correctness of this optimization. The test for an earlier
     860              :      * scan or command tolerates false negatives. FREEZE causes other sessions
     861              :      * to see rows they would not see under MVCC, and a false negative merely
     862              :      * spreads that anomaly to the current session.
     863              :      */
     864          911 :     if (cstate->opts.freeze)
     865              :     {
     866              :         /*
     867              :          * We currently disallow COPY FREEZE on partitioned tables.  The
     868              :          * reason for this is that we've simply not yet opened the partitions
     869              :          * to determine if the optimization can be applied to them.  We could
     870              :          * go and open them all here, but doing so may be quite a costly
     871              :          * overhead for small copies.  In any case, we may just end up routing
     872              :          * tuples to a small number of partitions.  It seems better just to
     873              :          * raise an ERROR for partitioned tables.
     874              :          */
     875           36 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     876              :         {
     877            3 :             ereport(ERROR,
     878              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     879              :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     880              :         }
     881              : 
     882              :         /* There's currently no support for COPY FREEZE on foreign tables. */
     883           33 :         if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
     884            3 :             ereport(ERROR,
     885              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     886              :                      errmsg("cannot perform COPY FREEZE on a foreign table")));
     887              : 
     888              :         /*
     889              :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     890              :          * Scan-bearing queries generally create at least two registrations,
     891              :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     892              :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     893              :          * still detect ongoing catalog scans, each of which separately
     894              :          * registers the snapshot it uses.
     895              :          */
     896           30 :         InvalidateCatalogSnapshot();
     897           30 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     898            0 :             ereport(ERROR,
     899              :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     900              :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     901              : 
     902           60 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     903           30 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     904            9 :             ereport(ERROR,
     905              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     906              :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     907              : 
     908           21 :         ti_options |= TABLE_INSERT_FROZEN;
     909              :     }
     910              : 
     911              :     /*
     912              :      * We need a ResultRelInfo so we can use the regular executor's
     913              :      * index-entry-making machinery.  (There used to be a huge amount of code
     914              :      * here that basically duplicated execUtils.c ...)
     915              :      */
     916          896 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
     917              :                        bms_make_singleton(1));
     918          896 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     919          896 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     920              : 
     921              :     /* Verify the named relation is a valid target for INSERT */
     922          896 :     CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
     923              : 
     924          895 :     ExecOpenIndices(resultRelInfo, false);
     925              : 
     926              :     /*
     927              :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     928              :      * foreign-table result relation(s).
     929              :      */
     930          895 :     mtstate = makeNode(ModifyTableState);
     931          895 :     mtstate->ps.plan = NULL;
     932          895 :     mtstate->ps.state = estate;
     933          895 :     mtstate->operation = CMD_INSERT;
     934          895 :     mtstate->mt_nrels = 1;
     935          895 :     mtstate->resultRelInfo = resultRelInfo;
     936          895 :     mtstate->rootResultRelInfo = resultRelInfo;
     937              : 
     938          895 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     939           18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     940           18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     941              :                                                          resultRelInfo);
     942              : 
     943              :     /*
     944              :      * Also, if the named relation is a foreign table, determine if the FDW
     945              :      * supports batch insert and determine the batch size (a FDW may support
     946              :      * batching, but it may be disabled for the server/table).
     947              :      *
     948              :      * If the FDW does not support batching, we set the batch size to 1.
     949              :      */
     950          895 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     951           18 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     952           18 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     953           18 :         resultRelInfo->ri_BatchSize =
     954           18 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     955              :     else
     956          877 :         resultRelInfo->ri_BatchSize = 1;
     957              : 
     958              :     Assert(resultRelInfo->ri_BatchSize >= 1);
     959              : 
     960              :     /* Prepare to catch AFTER triggers. */
     961          895 :     AfterTriggerBeginQuery();
     962              : 
     963              :     /*
     964              :      * If there are any triggers with transition tables on the named relation,
     965              :      * we need to be prepared to capture transition tuples.
     966              :      *
     967              :      * Because partition tuple routing would like to know about whether
     968              :      * transition capture is active, we also set it in mtstate, which is
     969              :      * passed to ExecFindPartition() below.
     970              :      */
     971          895 :     cstate->transition_capture = mtstate->mt_transition_capture =
     972          895 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     973          895 :                                    RelationGetRelid(cstate->rel),
     974              :                                    CMD_INSERT);
     975              : 
     976              :     /*
     977              :      * If the named relation is a partitioned table, initialize state for
     978              :      * CopyFrom tuple routing.
     979              :      */
     980          895 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     981           54 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     982              : 
     983          895 :     if (cstate->whereClause)
     984            9 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     985              :                                         &mtstate->ps);
     986              : 
     987              :     /*
     988              :      * It's generally more efficient to prepare a bunch of tuples for
     989              :      * insertion, and insert them in one
     990              :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     991              :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     992              :      * However, there are a number of reasons why we might not be able to do
     993              :      * this.  These are explained below.
     994              :      */
     995          895 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     996           93 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     997           47 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     998              :     {
     999              :         /*
    1000              :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
    1001              :          * triggers on the table. Such triggers might query the table we're
    1002              :          * inserting into and act differently if the tuples that have already
    1003              :          * been processed and prepared for insertion are not there.
    1004              :          */
    1005           52 :         insertMethod = CIM_SINGLE;
    1006              :     }
    1007          843 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
    1008           14 :              resultRelInfo->ri_BatchSize == 1)
    1009              :     {
    1010              :         /*
    1011              :          * Can't support multi-inserts to a foreign table if the FDW does not
    1012              :          * support batching, or it's disabled for the server or foreign table.
    1013              :          */
    1014            9 :         insertMethod = CIM_SINGLE;
    1015              :     }
    1016          834 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
    1017           16 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
    1018              :     {
    1019              :         /*
    1020              :          * For partitioned tables we can't support multi-inserts when there
    1021              :          * are any statement level insert triggers. It might be possible to
    1022              :          * allow partitioned tables with such triggers in the future, but for
    1023              :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
    1024              :          * statement level insert triggers are on the same relation.
    1025              :          */
    1026           11 :         insertMethod = CIM_SINGLE;
    1027              :     }
    1028          823 :     else if (cstate->volatile_defexprs)
    1029              :     {
    1030              :         /*
    1031              :          * Can't support multi-inserts if there are any volatile default
    1032              :          * expressions in the table.  Similarly to the trigger case above,
    1033              :          * such expressions may query the table we're inserting into.
    1034              :          *
    1035              :          * Note: It does not matter if any partitions have any volatile
    1036              :          * default expressions as we use the defaults from the target of the
    1037              :          * COPY command.
    1038              :          */
    1039            3 :         insertMethod = CIM_SINGLE;
    1040              :     }
    1041          820 :     else if (contain_volatile_functions(cstate->whereClause))
    1042              :     {
    1043              :         /*
    1044              :          * Can't support multi-inserts if there are any volatile function
    1045              :          * expressions in WHERE clause.  Similarly to the trigger case above,
    1046              :          * such expressions may query the table we're inserting into.
    1047              :          *
    1048              :          * Note: the whereClause was already preprocessed in DoCopy(), so it's
    1049              :          * okay to use contain_volatile_functions() directly.
    1050              :          */
    1051            0 :         insertMethod = CIM_SINGLE;
    1052              :     }
    1053              :     else
    1054              :     {
    1055              :         /*
    1056              :          * For partitioned tables, we may still be able to perform bulk
    1057              :          * inserts.  However, the possibility of this depends on which types
    1058              :          * of triggers exist on the partition.  We must disable bulk inserts
    1059              :          * if the partition is a foreign table that can't use batching or it
    1060              :          * has any before row insert or insert instead triggers (same as we
    1061              :          * checked above for the parent table).  Since the partition's
    1062              :          * resultRelInfos are initialized only when we actually need to insert
    1063              :          * the first tuple into them, we must have the intermediate insert
    1064              :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
    1065              :          * determine if we can use bulk-inserts for the partition being
    1066              :          * inserted into.
    1067              :          */
    1068          820 :         if (proute)
    1069           40 :             insertMethod = CIM_MULTI_CONDITIONAL;
    1070              :         else
    1071          780 :             insertMethod = CIM_MULTI;
    1072              : 
    1073          820 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
    1074              :                                 estate, mycid, ti_options);
    1075              :     }
    1076              : 
    1077              :     /*
    1078              :      * If not using batch mode (which allocates slots as needed) set up a
    1079              :      * tuple slot too. When inserting into a partitioned table, we also need
    1080              :      * one, even if we might batch insert, to read the tuple in the root
    1081              :      * partition's form.
    1082              :      */
    1083          895 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
    1084              :     {
    1085          115 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
    1086              :                                        &estate->es_tupleTable);
    1087          115 :         bistate = GetBulkInsertState();
    1088              :     }
    1089              : 
    1090          988 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1091           93 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1092              : 
    1093          988 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1094           93 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1095              : 
    1096              :     /*
    1097              :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
    1098              :      * should do this for COPY, since it's not really an "INSERT" statement as
    1099              :      * such. However, executing these triggers maintains consistency with the
    1100              :      * EACH ROW triggers that we already fire on COPY.
    1101              :      */
    1102          895 :     ExecBSInsertTriggers(estate, resultRelInfo);
    1103              : 
    1104          895 :     econtext = GetPerTupleExprContext(estate);
    1105              : 
    1106              :     /* Set up callback to identify error line number */
    1107          895 :     errcallback.callback = CopyFromErrorCallback;
    1108          895 :     errcallback.arg = cstate;
    1109          895 :     errcallback.previous = error_context_stack;
    1110          895 :     error_context_stack = &errcallback;
    1111              : 
    1112              :     for (;;)
    1113       632700 :     {
    1114              :         TupleTableSlot *myslot;
    1115              :         bool        skip_tuple;
    1116              : 
    1117       633595 :         CHECK_FOR_INTERRUPTS();
    1118              : 
    1119              :         /*
    1120              :          * Reset the per-tuple exprcontext. We do this after every tuple, to
    1121              :          * clean-up after expression evaluations etc.
    1122              :          */
    1123       633595 :         ResetPerTupleExprContext(estate);
    1124              : 
    1125              :         /* select slot to (initially) load row into */
    1126       633595 :         if (insertMethod == CIM_SINGLE || proute)
    1127              :         {
    1128       137441 :             myslot = singleslot;
    1129       137441 :             Assert(myslot != NULL);
    1130              :         }
    1131              :         else
    1132              :         {
    1133              :             Assert(resultRelInfo == target_resultRelInfo);
    1134              :             Assert(insertMethod == CIM_MULTI);
    1135              : 
    1136       496154 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1137              :                                                      resultRelInfo);
    1138              :         }
    1139              : 
    1140              :         /*
    1141              :          * Switch to per-tuple context before calling NextCopyFrom, which does
    1142              :          * evaluate default expressions etc. and requires per-tuple context.
    1143              :          */
    1144       633595 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1145              : 
    1146       633595 :         ExecClearTuple(myslot);
    1147              : 
    1148              :         /* Directly store the values/nulls array in the slot */
    1149       633595 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
    1150          795 :             break;
    1151              : 
    1152       632722 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
    1153           81 :             cstate->escontext->error_occurred)
    1154              :         {
    1155              :             /*
    1156              :              * Soft error occurred, skip this tuple and just make
    1157              :              * ErrorSaveContext ready for the next NextCopyFrom. Since we
    1158              :              * don't set details_wanted and error_data is not to be filled,
    1159              :              * just resetting error_occurred is enough.
    1160              :              */
    1161           54 :             cstate->escontext->error_occurred = false;
    1162              : 
    1163              :             /* Report that this tuple was skipped by the ON_ERROR clause */
    1164           54 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
    1165           54 :                                          cstate->num_errors);
    1166              : 
    1167           54 :             if (cstate->opts.reject_limit > 0 &&
    1168           24 :                 cstate->num_errors > cstate->opts.reject_limit)
    1169            3 :                 ereport(ERROR,
    1170              :                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
    1171              :                          errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
    1172              :                                 cstate->opts.reject_limit)));
    1173              : 
    1174              :             /* Repeat NextCopyFrom() until no soft error occurs */
    1175           51 :             continue;
    1176              :         }
    1177              : 
    1178       632668 :         ExecStoreVirtualTuple(myslot);
    1179              : 
    1180              :         /*
    1181              :          * Constraints and where clause might reference the tableoid column,
    1182              :          * so (re-)initialize tts_tableOid before evaluating them.
    1183              :          */
    1184       632668 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
    1185              : 
    1186              :         /* Triggers and stuff need to be invoked in query context. */
    1187       632668 :         MemoryContextSwitchTo(oldcontext);
    1188              : 
    1189       632668 :         if (cstate->whereClause)
    1190              :         {
    1191           33 :             econtext->ecxt_scantuple = myslot;
    1192              :             /* Skip items that don't match COPY's WHERE clause */
    1193           33 :             if (!ExecQual(cstate->qualexpr, econtext))
    1194              :             {
    1195              :                 /*
    1196              :                  * Report that this tuple was filtered out by the WHERE
    1197              :                  * clause.
    1198              :                  */
    1199           18 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1200              :                                              ++excluded);
    1201           18 :                 continue;
    1202              :             }
    1203              :         }
    1204              : 
    1205              :         /* Determine the partition to insert the tuple into */
    1206       632650 :         if (proute)
    1207              :         {
    1208              :             TupleConversionMap *map;
    1209              : 
    1210              :             /*
    1211              :              * Attempt to find a partition suitable for this tuple.
    1212              :              * ExecFindPartition() will raise an error if none can be found or
    1213              :              * if the found partition is not suitable for INSERTs.
    1214              :              */
    1215       137197 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1216              :                                               proute, myslot, estate);
    1217              : 
    1218       137196 :             if (prevResultRelInfo != resultRelInfo)
    1219              :             {
    1220              :                 /* Determine which triggers exist on this partition */
    1221        80785 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1222           27 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1223              : 
    1224        80785 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1225           27 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1226              : 
    1227              :                 /*
    1228              :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1229              :                  * OF triggers, or if the partition is a foreign table that
    1230              :                  * can't use batching.
    1231              :                  */
    1232       131487 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1233        50729 :                     !has_before_insert_row_trig &&
    1234       182204 :                     !has_instead_insert_row_trig &&
    1235        50717 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1236            6 :                      resultRelInfo->ri_BatchSize > 1);
    1237              : 
    1238              :                 /* Set the multi-insert buffer to use for this partition. */
    1239        80758 :                 if (leafpart_use_multi_insert)
    1240              :                 {
    1241        50715 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1242           43 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1243              :                                                        resultRelInfo);
    1244              :                 }
    1245        30043 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1246           14 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1247              :                 {
    1248              :                     /*
    1249              :                      * Flush pending inserts if this partition can't use
    1250              :                      * batching, so rows are visible to triggers etc.
    1251              :                      */
    1252            0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1253              :                                              resultRelInfo,
    1254              :                                              &processed);
    1255              :                 }
    1256              : 
    1257        80758 :                 if (bistate != NULL)
    1258        80758 :                     ReleaseBulkInsertStatePin(bistate);
    1259        80758 :                 prevResultRelInfo = resultRelInfo;
    1260              :             }
    1261              : 
    1262              :             /*
    1263              :              * If we're capturing transition tuples, we might need to convert
    1264              :              * from the partition rowtype to root rowtype. But if there are no
    1265              :              * BEFORE triggers on the partition that could change the tuple,
    1266              :              * we can just remember the original unconverted tuple to avoid a
    1267              :              * needless round trip conversion.
    1268              :              */
    1269       137196 :             if (cstate->transition_capture != NULL)
    1270           29 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1271           29 :                     !has_before_insert_row_trig ? myslot : NULL;
    1272              : 
    1273              :             /*
    1274              :              * We might need to convert from the root rowtype to the partition
    1275              :              * rowtype.
    1276              :              */
    1277       137196 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1278       137196 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1279              :             {
    1280              :                 /* non batch insert */
    1281        30070 :                 if (map != NULL)
    1282              :                 {
    1283              :                     TupleTableSlot *new_slot;
    1284              : 
    1285           55 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1286           55 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1287              :                 }
    1288              :             }
    1289              :             else
    1290              :             {
    1291              :                 /*
    1292              :                  * Prepare to queue up tuple for later batch insert into
    1293              :                  * current partition.
    1294              :                  */
    1295              :                 TupleTableSlot *batchslot;
    1296              : 
    1297              :                 /* no other path available for partitioned table */
    1298              :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1299              : 
    1300       107126 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1301              :                                                             resultRelInfo);
    1302              : 
    1303       107126 :                 if (map != NULL)
    1304         6100 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1305              :                                                    batchslot);
    1306              :                 else
    1307              :                 {
    1308              :                     /*
    1309              :                      * This looks more expensive than it is (Believe me, I
    1310              :                      * optimized it away. Twice.). The input is in virtual
    1311              :                      * form, and we'll materialize the slot below - for most
    1312              :                      * slot types the copy performs the work materialization
    1313              :                      * would later require anyway.
    1314              :                      */
    1315       101026 :                     ExecCopySlot(batchslot, myslot);
    1316       101026 :                     myslot = batchslot;
    1317              :                 }
    1318              :             }
    1319              : 
    1320              :             /* ensure that triggers etc see the right relation  */
    1321       137196 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1322              :         }
    1323              : 
    1324       632649 :         skip_tuple = false;
    1325              : 
    1326              :         /* BEFORE ROW INSERT Triggers */
    1327       632649 :         if (has_before_insert_row_trig)
    1328              :         {
    1329          137 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1330            8 :                 skip_tuple = true;  /* "do nothing" */
    1331              :         }
    1332              : 
    1333       632649 :         if (!skip_tuple)
    1334              :         {
    1335              :             /*
    1336              :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1337              :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1338              :              * table or foreign table.
    1339              :              */
    1340       632641 :             if (has_instead_insert_row_trig)
    1341              :             {
    1342            6 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1343              :             }
    1344              :             else
    1345              :             {
    1346              :                 /* Compute stored generated columns */
    1347       632635 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1348       230969 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1349           17 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1350              :                                                CMD_INSERT);
    1351              : 
    1352              :                 /*
    1353              :                  * If the target is a plain table, check the constraints of
    1354              :                  * the tuple.
    1355              :                  */
    1356       632635 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1357       632589 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1358       230951 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1359              : 
    1360              :                 /*
    1361              :                  * Also check the tuple against the partition constraint, if
    1362              :                  * there is one; except that if we got here via tuple-routing,
    1363              :                  * we don't need to if there's no BR trigger defined on the
    1364              :                  * partition.
    1365              :                  */
    1366       632620 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1367       137190 :                     (proute == NULL || has_before_insert_row_trig))
    1368         1154 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1369              : 
    1370              :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1371       632620 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1372              :                 {
    1373              :                     /*
    1374              :                      * The slot previously might point into the per-tuple
    1375              :                      * context. For batching it needs to be longer lived.
    1376              :                      */
    1377       602446 :                     ExecMaterializeSlot(myslot);
    1378              : 
    1379              :                     /* Add this tuple to the tuple buffer */
    1380       602446 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1381              :                                              resultRelInfo, myslot,
    1382              :                                              cstate->line_buf.len,
    1383              :                                              cstate->cur_lineno);
    1384              : 
    1385              :                     /*
    1386              :                      * If enough inserts have queued up, then flush all
    1387              :                      * buffers out to their tables.
    1388              :                      */
    1389       602446 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1390          550 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1391              :                                                  resultRelInfo,
    1392              :                                                  &processed);
    1393              : 
    1394              :                     /*
    1395              :                      * We delay updating the row counter and progress of the
    1396              :                      * COPY command until after writing the tuples stored in
    1397              :                      * the buffer out to the table, as in single insert mode.
    1398              :                      * See CopyMultiInsertBufferFlush().
    1399              :                      */
    1400       602446 :                     continue;   /* next tuple please */
    1401              :                 }
    1402              :                 else
    1403              :                 {
    1404        30174 :                     List       *recheckIndexes = NIL;
    1405              : 
    1406              :                     /* OK, store the tuple */
    1407        30174 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1408              :                     {
    1409           27 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1410              :                                                                                  resultRelInfo,
    1411              :                                                                                  myslot,
    1412              :                                                                                  NULL);
    1413              : 
    1414           26 :                         if (myslot == NULL) /* "do nothing" */
    1415            2 :                             continue;   /* next tuple please */
    1416              : 
    1417              :                         /*
    1418              :                          * AFTER ROW Triggers might reference the tableoid
    1419              :                          * column, so (re-)initialize tts_tableOid before
    1420              :                          * evaluating them.
    1421              :                          */
    1422           24 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1423              :                     }
    1424              :                     else
    1425              :                     {
    1426              :                         /* OK, store the tuple and create index entries for it */
    1427        30147 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1428              :                                            myslot, mycid, ti_options, bistate);
    1429              : 
    1430        30147 :                         if (resultRelInfo->ri_NumIndices > 0)
    1431            0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1432              :                                                                    myslot,
    1433              :                                                                    estate,
    1434              :                                                                    false,
    1435              :                                                                    false,
    1436              :                                                                    NULL,
    1437              :                                                                    NIL,
    1438              :                                                                    false);
    1439              :                     }
    1440              : 
    1441              :                     /* AFTER ROW INSERT Triggers */
    1442        30171 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1443              :                                          recheckIndexes, cstate->transition_capture);
    1444              : 
    1445        30169 :                     list_free(recheckIndexes);
    1446              :                 }
    1447              :             }
    1448              : 
    1449              :             /*
    1450              :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1451              :              * or FDW; this is the same definition used by nodeModifyTable.c
    1452              :              * for counting tuples inserted by an INSERT command.  Update
    1453              :              * progress of the COPY command as well.
    1454              :              */
    1455        30175 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1456              :                                          ++processed);
    1457              :         }
    1458              :     }
    1459              : 
    1460              :     /* Flush any remaining buffered tuples */
    1461          795 :     if (insertMethod != CIM_SINGLE)
    1462              :     {
    1463          735 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1464          578 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1465              :     }
    1466              : 
    1467              :     /* Done, clean up */
    1468          786 :     error_context_stack = errcallback.previous;
    1469              : 
    1470          786 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
    1471           15 :         cstate->num_errors > 0 &&
    1472           15 :         cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
    1473           12 :         ereport(NOTICE,
    1474              :                 errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
    1475              :                               "%" PRIu64 " rows were skipped due to data type incompatibility",
    1476              :                               cstate->num_errors,
    1477              :                               cstate->num_errors));
    1478              : 
    1479          786 :     if (bistate != NULL)
    1480           99 :         FreeBulkInsertState(bistate);
    1481              : 
    1482          786 :     MemoryContextSwitchTo(oldcontext);
    1483              : 
    1484              :     /* Execute AFTER STATEMENT insertion triggers */
    1485          786 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1486              : 
    1487              :     /* Handle queued AFTER triggers */
    1488          786 :     AfterTriggerEndQuery(estate);
    1489              : 
    1490          786 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1491              : 
    1492              :     /* Allow the FDW to shut down */
    1493          786 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1494           16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1495           16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1496              :                                                               target_resultRelInfo);
    1497              : 
    1498              :     /* Tear down the multi-insert buffer data */
    1499          786 :     if (insertMethod != CIM_SINGLE)
    1500          726 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1501              : 
    1502              :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1503          786 :     if (proute)
    1504           51 :         ExecCleanupTupleRouting(mtstate, proute);
    1505              : 
    1506              :     /* Close the result relations, including any trigger target relations */
    1507          786 :     ExecCloseResultRelations(estate);
    1508          786 :     ExecCloseRangeTableRelations(estate);
    1509              : 
    1510          786 :     FreeExecutorState(estate);
    1511              : 
    1512          786 :     return processed;
    1513              : }
    1514              : 
    1515              : /*
    1516              :  * Setup to read tuples from a file for COPY FROM.
    1517              :  *
    1518              :  * 'rel': Used as a template for the tuples
    1519              :  * 'whereClause': WHERE clause from the COPY FROM command
    1520              :  * 'filename': Name of server-local file to read, NULL for STDIN
    1521              :  * 'is_program': true if 'filename' is program to execute
    1522              :  * 'data_source_cb': callback that provides the input data
    1523              :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1524              :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1525              :  *
    1526              :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1527              :  */
    1528              : CopyFromState
    1529         1088 : BeginCopyFrom(ParseState *pstate,
    1530              :               Relation rel,
    1531              :               Node *whereClause,
    1532              :               const char *filename,
    1533              :               bool is_program,
    1534              :               copy_data_source_cb data_source_cb,
    1535              :               List *attnamelist,
    1536              :               List *options)
    1537              : {
    1538              :     CopyFromState cstate;
    1539         1088 :     bool        pipe = (filename == NULL);
    1540              :     TupleDesc   tupDesc;
    1541              :     AttrNumber  num_phys_attrs,
    1542              :                 num_defaults;
    1543              :     FmgrInfo   *in_functions;
    1544              :     Oid        *typioparams;
    1545              :     int        *defmap;
    1546              :     ExprState **defexprs;
    1547              :     MemoryContext oldcontext;
    1548              :     bool        volatile_defexprs;
    1549         1088 :     const int   progress_cols[] = {
    1550              :         PROGRESS_COPY_COMMAND,
    1551              :         PROGRESS_COPY_TYPE,
    1552              :         PROGRESS_COPY_BYTES_TOTAL
    1553              :     };
    1554         1088 :     int64       progress_vals[] = {
    1555              :         PROGRESS_COPY_COMMAND_FROM,
    1556              :         0,
    1557              :         0
    1558              :     };
    1559              : 
    1560              :     /* Allocate workspace and zero all fields */
    1561         1088 :     cstate = palloc0_object(CopyFromStateData);
    1562              : 
    1563              :     /*
    1564              :      * We allocate everything used by a cstate in a new memory context. This
    1565              :      * avoids memory leaks during repeated use of COPY in a query.
    1566              :      */
    1567         1088 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1568              :                                                 "COPY",
    1569              :                                                 ALLOCSET_DEFAULT_SIZES);
    1570              : 
    1571         1088 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1572              : 
    1573              :     /* Extract options from the statement node tree */
    1574         1088 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1575              : 
    1576              :     /* Set the format routine */
    1577          964 :     cstate->routine = CopyFromGetRoutine(&cstate->opts);
    1578              : 
    1579              :     /* Process the target relation */
    1580          964 :     cstate->rel = rel;
    1581              : 
    1582          964 :     tupDesc = RelationGetDescr(cstate->rel);
    1583              : 
    1584              :     /* process common options or initialization */
    1585              : 
    1586              :     /* Generate or convert list of attributes to process */
    1587          964 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1588              : 
    1589          964 :     num_phys_attrs = tupDesc->natts;
    1590              : 
    1591              :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1592          964 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1593          964 :     if (cstate->opts.force_notnull_all)
    1594            6 :         MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
    1595          958 :     else if (cstate->opts.force_notnull)
    1596              :     {
    1597              :         List       *attnums;
    1598              :         ListCell   *cur;
    1599              : 
    1600           13 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1601              : 
    1602           26 :         foreach(cur, attnums)
    1603              :         {
    1604           16 :             int         attnum = lfirst_int(cur);
    1605           16 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1606              : 
    1607           16 :             if (!list_member_int(cstate->attnumlist, attnum))
    1608            3 :                 ereport(ERROR,
    1609              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1610              :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1611              :                          errmsg("%s column \"%s\" not referenced by COPY",
    1612              :                                 "FORCE_NOT_NULL", NameStr(attr->attname))));
    1613           13 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1614              :         }
    1615              :     }
    1616              : 
    1617              :     /* Set up soft error handler for ON_ERROR */
    1618          961 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
    1619              :     {
    1620           32 :         cstate->escontext = makeNode(ErrorSaveContext);
    1621           32 :         cstate->escontext->type = T_ErrorSaveContext;
    1622           32 :         cstate->escontext->error_occurred = false;
    1623              : 
    1624              :         /*
    1625              :          * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
    1626              :          * options later
    1627              :          */
    1628           32 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1629           32 :             cstate->escontext->details_wanted = false;
    1630              :     }
    1631              :     else
    1632          929 :         cstate->escontext = NULL;
    1633              : 
    1634              :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1635          961 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1636          961 :     if (cstate->opts.force_null_all)
    1637            6 :         MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
    1638          955 :     else if (cstate->opts.force_null)
    1639              :     {
    1640              :         List       *attnums;
    1641              :         ListCell   *cur;
    1642              : 
    1643           13 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1644              : 
    1645           26 :         foreach(cur, attnums)
    1646              :         {
    1647           16 :             int         attnum = lfirst_int(cur);
    1648           16 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1649              : 
    1650           16 :             if (!list_member_int(cstate->attnumlist, attnum))
    1651            3 :                 ereport(ERROR,
    1652              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1653              :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1654              :                          errmsg("%s column \"%s\" not referenced by COPY",
    1655              :                                 "FORCE_NULL", NameStr(attr->attname))));
    1656           13 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1657              :         }
    1658              :     }
    1659              : 
    1660              :     /* Convert convert_selectively name list to per-column flags */
    1661          958 :     if (cstate->opts.convert_selectively)
    1662              :     {
    1663              :         List       *attnums;
    1664              :         ListCell   *cur;
    1665              : 
    1666            3 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1667              : 
    1668            3 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1669              : 
    1670            7 :         foreach(cur, attnums)
    1671              :         {
    1672            4 :             int         attnum = lfirst_int(cur);
    1673            4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1674              : 
    1675            4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1676            0 :                 ereport(ERROR,
    1677              :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1678              :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1679              :                                          NameStr(attr->attname))));
    1680            4 :             cstate->convert_select_flags[attnum - 1] = true;
    1681              :         }
    1682              :     }
    1683              : 
    1684              :     /* Use client encoding when ENCODING option is not specified. */
    1685          958 :     if (cstate->opts.file_encoding < 0)
    1686          943 :         cstate->file_encoding = pg_get_client_encoding();
    1687              :     else
    1688           15 :         cstate->file_encoding = cstate->opts.file_encoding;
    1689              : 
    1690              :     /*
    1691              :      * Look up encoding conversion function.
    1692              :      */
    1693          958 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1694           39 :         cstate->file_encoding == PG_SQL_ASCII ||
    1695           18 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1696              :     {
    1697          940 :         cstate->need_transcoding = false;
    1698              :     }
    1699              :     else
    1700              :     {
    1701           18 :         cstate->need_transcoding = true;
    1702           18 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1703              :                                                             GetDatabaseEncoding());
    1704           18 :         if (!OidIsValid(cstate->conversion_proc))
    1705            0 :             ereport(ERROR,
    1706              :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
    1707              :                      errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
    1708              :                             pg_encoding_to_char(cstate->file_encoding),
    1709              :                             pg_encoding_to_char(GetDatabaseEncoding()))));
    1710              :     }
    1711              : 
    1712          958 :     cstate->copy_src = COPY_FILE;    /* default */
    1713              : 
    1714          958 :     cstate->whereClause = whereClause;
    1715              : 
    1716              :     /* Initialize state variables */
    1717          958 :     cstate->eol_type = EOL_UNKNOWN;
    1718          958 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1719          958 :     cstate->cur_lineno = 0;
    1720          958 :     cstate->cur_attname = NULL;
    1721          958 :     cstate->cur_attval = NULL;
    1722          958 :     cstate->relname_only = false;
    1723              : 
    1724              :     /*
    1725              :      * Allocate buffers for the input pipeline.
    1726              :      *
    1727              :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1728              :      * input_buf and line_buf only in text mode.
    1729              :      */
    1730          958 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1731          958 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1732          958 :     cstate->raw_reached_eof = false;
    1733              : 
    1734          958 :     initStringInfo(&cstate->attribute_buf);
    1735              : 
    1736              :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1737          958 :     if (pstate)
    1738              :     {
    1739          921 :         cstate->range_table = pstate->p_rtable;
    1740          921 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1741              :     }
    1742              : 
    1743          958 :     num_defaults = 0;
    1744          958 :     volatile_defexprs = false;
    1745              : 
    1746              :     /*
    1747              :      * Pick up the required catalog information for each attribute in the
    1748              :      * relation, including the input function, the element type (to pass to
    1749              :      * the input function), and info about defaults and constraints. (Which
    1750              :      * input function we use depends on text/binary format choice.)
    1751              :      */
    1752          958 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1753          958 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1754          958 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1755          958 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1756              : 
    1757         3801 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1758              :     {
    1759         2850 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1760              : 
    1761              :         /* We don't need info for dropped attributes */
    1762         2850 :         if (att->attisdropped)
    1763           62 :             continue;
    1764              : 
    1765              :         /* Fetch the input function and typioparam info */
    1766         2788 :         cstate->routine->CopyFromInFunc(cstate, att->atttypid,
    1767         2788 :                                         &in_functions[attnum - 1],
    1768         2788 :                                         &typioparams[attnum - 1]);
    1769              : 
    1770              :         /* Get default info if available */
    1771         2787 :         defexprs[attnum - 1] = NULL;
    1772              : 
    1773              :         /*
    1774              :          * We only need the default values for columns that do not appear in
    1775              :          * the column list, unless the DEFAULT option was given. We never need
    1776              :          * default values for generated columns.
    1777              :          */
    1778         2787 :         if ((cstate->opts.default_print != NULL ||
    1779         2724 :              !list_member_int(cstate->attnumlist, attnum)) &&
    1780          341 :             !att->attgenerated)
    1781              :         {
    1782          323 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1783              :                                                                 attnum);
    1784              : 
    1785          323 :             if (defexpr != NULL)
    1786              :             {
    1787              :                 /* Run the expression through planner */
    1788          132 :                 defexpr = expression_planner(defexpr);
    1789              : 
    1790              :                 /* Initialize executable expression in copycontext */
    1791          126 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1792              : 
    1793              :                 /* if NOT copied from input */
    1794              :                 /* use default value if one exists */
    1795          126 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1796              :                 {
    1797           86 :                     defmap[num_defaults] = attnum - 1;
    1798           86 :                     num_defaults++;
    1799              :                 }
    1800              : 
    1801              :                 /*
    1802              :                  * If a default expression looks at the table being loaded,
    1803              :                  * then it could give the wrong answer when using
    1804              :                  * multi-insert. Since database access can be dynamic this is
    1805              :                  * hard to test for exactly, so we use the much wider test of
    1806              :                  * whether the default expression is volatile. We allow for
    1807              :                  * the special case of when the default expression is the
    1808              :                  * nextval() of a sequence which in this specific case is
    1809              :                  * known to be safe for use with the multi-insert
    1810              :                  * optimization. Hence we use this special case function
    1811              :                  * checker rather than the standard check for
    1812              :                  * contain_volatile_functions().  Note also that we already
    1813              :                  * ran the expression through expression_planner().
    1814              :                  */
    1815          126 :                 if (!volatile_defexprs)
    1816          126 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1817              :             }
    1818              :         }
    1819              :     }
    1820              : 
    1821          951 :     cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
    1822              : 
    1823              :     /* initialize progress */
    1824          951 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1825          951 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1826          951 :     cstate->bytes_processed = 0;
    1827              : 
    1828              :     /* We keep those variables in cstate. */
    1829          951 :     cstate->in_functions = in_functions;
    1830          951 :     cstate->typioparams = typioparams;
    1831          951 :     cstate->defmap = defmap;
    1832          951 :     cstate->defexprs = defexprs;
    1833          951 :     cstate->volatile_defexprs = volatile_defexprs;
    1834          951 :     cstate->num_defaults = num_defaults;
    1835          951 :     cstate->is_program = is_program;
    1836              : 
    1837          951 :     if (data_source_cb)
    1838              :     {
    1839          195 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1840          195 :         cstate->copy_src = COPY_CALLBACK;
    1841          195 :         cstate->data_source_cb = data_source_cb;
    1842              :     }
    1843          756 :     else if (pipe)
    1844              :     {
    1845          523 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1846              :         Assert(!is_program);    /* the grammar does not allow this */
    1847          523 :         if (whereToSendOutput == DestRemote)
    1848          523 :             ReceiveCopyBegin(cstate);
    1849              :         else
    1850            0 :             cstate->copy_file = stdin;
    1851              :     }
    1852              :     else
    1853              :     {
    1854          233 :         cstate->filename = pstrdup(filename);
    1855              : 
    1856          233 :         if (cstate->is_program)
    1857              :         {
    1858            0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1859            0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1860            0 :             if (cstate->copy_file == NULL)
    1861            0 :                 ereport(ERROR,
    1862              :                         (errcode_for_file_access(),
    1863              :                          errmsg("could not execute command \"%s\": %m",
    1864              :                                 cstate->filename)));
    1865              :         }
    1866              :         else
    1867              :         {
    1868              :             struct stat st;
    1869              : 
    1870          233 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1871          233 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1872          233 :             if (cstate->copy_file == NULL)
    1873              :             {
    1874              :                 /* copy errno because ereport subfunctions might change it */
    1875            0 :                 int         save_errno = errno;
    1876              : 
    1877            0 :                 ereport(ERROR,
    1878              :                         (errcode_for_file_access(),
    1879              :                          errmsg("could not open file \"%s\" for reading: %m",
    1880              :                                 cstate->filename),
    1881              :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1882              :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1883              :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1884              :             }
    1885              : 
    1886          233 :             if (fstat(fileno(cstate->copy_file), &st))
    1887            0 :                 ereport(ERROR,
    1888              :                         (errcode_for_file_access(),
    1889              :                          errmsg("could not stat file \"%s\": %m",
    1890              :                                 cstate->filename)));
    1891              : 
    1892          233 :             if (S_ISDIR(st.st_mode))
    1893            0 :                 ereport(ERROR,
    1894              :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1895              :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1896              : 
    1897          233 :             progress_vals[2] = st.st_size;
    1898              :         }
    1899              :     }
    1900              : 
    1901          951 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1902              : 
    1903          951 :     cstate->routine->CopyFromStart(cstate, tupDesc);
    1904              : 
    1905          951 :     MemoryContextSwitchTo(oldcontext);
    1906              : 
    1907          951 :     return cstate;
    1908              : }
    1909              : 
    1910              : /*
    1911              :  * Clean up storage and release resources for COPY FROM.
    1912              :  */
    1913              : void
    1914          634 : EndCopyFrom(CopyFromState cstate)
    1915              : {
    1916              :     /* Invoke the end callback */
    1917          634 :     cstate->routine->CopyFromEnd(cstate);
    1918              : 
    1919              :     /* No COPY FROM related resources except memory. */
    1920          634 :     if (cstate->is_program)
    1921              :     {
    1922            0 :         ClosePipeFromProgram(cstate);
    1923              :     }
    1924              :     else
    1925              :     {
    1926          634 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1927            0 :             ereport(ERROR,
    1928              :                     (errcode_for_file_access(),
    1929              :                      errmsg("could not close file \"%s\": %m",
    1930              :                             cstate->filename)));
    1931              :     }
    1932              : 
    1933          634 :     pgstat_progress_end_command();
    1934              : 
    1935          634 :     MemoryContextDelete(cstate->copycontext);
    1936          634 :     pfree(cstate);
    1937          634 : }
    1938              : 
    1939              : /*
    1940              :  * Closes the pipe from an external program, checking the pclose() return code.
    1941              :  */
    1942              : static void
    1943            0 : ClosePipeFromProgram(CopyFromState cstate)
    1944              : {
    1945              :     int         pclose_rc;
    1946              : 
    1947              :     Assert(cstate->is_program);
    1948              : 
    1949            0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1950            0 :     if (pclose_rc == -1)
    1951            0 :         ereport(ERROR,
    1952              :                 (errcode_for_file_access(),
    1953              :                  errmsg("could not close pipe to external command: %m")));
    1954            0 :     else if (pclose_rc != 0)
    1955              :     {
    1956              :         /*
    1957              :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1958              :          * expectable for the called program to fail with SIGPIPE, and we
    1959              :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1960              :          * problem.
    1961              :          */
    1962            0 :         if (!cstate->raw_reached_eof &&
    1963            0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1964            0 :             return;
    1965              : 
    1966            0 :         ereport(ERROR,
    1967              :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1968              :                  errmsg("program \"%s\" failed",
    1969              :                         cstate->filename),
    1970              :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1971              :     }
    1972              : }
        

Generated by: LCOV version 2.0-1