LCOV - code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 514 565 91.0 %
Date: 2025-01-18 04:15:08 Functions: 16 17 94.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * copyfrom.c
       4             :  *      COPY <table> FROM file/program/client
       5             :  *
       6             :  * This file contains routines needed to efficiently load tuples into a
       7             :  * table.  That includes looking up the correct partition, firing triggers,
       8             :  * calling the table AM function to insert the data, and updating indexes.
       9             :  * Reading data from the input file or client and parsing it into Datums
      10             :  * is handled in copyfromparse.c.
      11             :  *
      12             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      13             :  * Portions Copyright (c) 1994, Regents of the University of California
      14             :  *
      15             :  *
      16             :  * IDENTIFICATION
      17             :  *    src/backend/commands/copyfrom.c
      18             :  *
      19             :  *-------------------------------------------------------------------------
      20             :  */
      21             : #include "postgres.h"
      22             : 
      23             : #include <ctype.h>
      24             : #include <unistd.h>
      25             : #include <sys/stat.h>
      26             : 
      27             : #include "access/heapam.h"
      28             : #include "access/tableam.h"
      29             : #include "access/xact.h"
      30             : #include "catalog/namespace.h"
      31             : #include "commands/copy.h"
      32             : #include "commands/copyfrom_internal.h"
      33             : #include "commands/progress.h"
      34             : #include "commands/trigger.h"
      35             : #include "executor/execPartition.h"
      36             : #include "executor/executor.h"
      37             : #include "executor/nodeModifyTable.h"
      38             : #include "executor/tuptable.h"
      39             : #include "foreign/fdwapi.h"
      40             : #include "mb/pg_wchar.h"
      41             : #include "miscadmin.h"
      42             : #include "nodes/miscnodes.h"
      43             : #include "optimizer/optimizer.h"
      44             : #include "pgstat.h"
      45             : #include "rewrite/rewriteHandler.h"
      46             : #include "storage/fd.h"
      47             : #include "tcop/tcopprot.h"
      48             : #include "utils/lsyscache.h"
      49             : #include "utils/memutils.h"
      50             : #include "utils/portal.h"
      51             : #include "utils/rel.h"
      52             : #include "utils/snapmgr.h"
      53             : 
      54             : /*
      55             :  * No more than this many tuples per CopyMultiInsertBuffer
      56             :  *
      57             :  * Caution: Don't make this too big, as we could end up with this many
      58             :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      59             :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      60             :  * memory requirements during copies into partitioned tables with a large
      61             :  * number of partitions.
      62             :  */
      63             : #define MAX_BUFFERED_TUPLES     1000
      64             : 
      65             : /*
      66             :  * Flush buffers if there are >= this many bytes, as counted by the input
      67             :  * size, of tuples stored.
      68             :  */
      69             : #define MAX_BUFFERED_BYTES      65535
      70             : 
      71             : /*
      72             :  * Trim the list of buffers back down to this number after flushing.  This
      73             :  * must be >= 2.
      74             :  */
      75             : #define MAX_PARTITION_BUFFERS   32
      76             : 
      77             : /* Stores multi-insert data related to a single relation in CopyFrom. */
      78             : typedef struct CopyMultiInsertBuffer
      79             : {
      80             :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      81             :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      82             :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      83             :                                  * table; NULL if foreign table */
      84             :     int         nused;          /* number of 'slots' containing tuples */
      85             :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      86             :                                                  * stream */
      87             : } CopyMultiInsertBuffer;
      88             : 
      89             : /*
      90             :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      91             :  * number of tuples which are stored in them.  This allows multiple buffers to
      92             :  * exist at once when COPYing into a partitioned table.
      93             :  */
      94             : typedef struct CopyMultiInsertInfo
      95             : {
      96             :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      97             :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      98             :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
      99             :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
     100             :     EState     *estate;         /* Executor state used for COPY */
     101             :     CommandId   mycid;          /* Command Id used for COPY */
     102             :     int         ti_options;     /* table insert options */
     103             : } CopyMultiInsertInfo;
     104             : 
     105             : 
     106             : /* non-export function prototypes */
     107             : static void ClosePipeFromProgram(CopyFromState cstate);
     108             : 
     109             : /*
     110             :  * error context callback for COPY FROM
     111             :  *
     112             :  * The argument for the error context must be CopyFromState.
     113             :  */
     114             : void
     115         312 : CopyFromErrorCallback(void *arg)
     116             : {
     117         312 :     CopyFromState cstate = (CopyFromState) arg;
     118             : 
     119         312 :     if (cstate->relname_only)
     120             :     {
     121          44 :         errcontext("COPY %s",
     122             :                    cstate->cur_relname);
     123          44 :         return;
     124             :     }
     125         268 :     if (cstate->opts.binary)
     126             :     {
     127             :         /* can't usefully display the data */
     128           2 :         if (cstate->cur_attname)
     129           2 :             errcontext("COPY %s, line %llu, column %s",
     130             :                        cstate->cur_relname,
     131           2 :                        (unsigned long long) cstate->cur_lineno,
     132             :                        cstate->cur_attname);
     133             :         else
     134           0 :             errcontext("COPY %s, line %llu",
     135             :                        cstate->cur_relname,
     136           0 :                        (unsigned long long) cstate->cur_lineno);
     137             :     }
     138             :     else
     139             :     {
     140         266 :         if (cstate->cur_attname && cstate->cur_attval)
     141          40 :         {
     142             :             /* error is relevant to a particular column */
     143             :             char       *attval;
     144             : 
     145          40 :             attval = CopyLimitPrintoutLength(cstate->cur_attval);
     146          40 :             errcontext("COPY %s, line %llu, column %s: \"%s\"",
     147             :                        cstate->cur_relname,
     148          40 :                        (unsigned long long) cstate->cur_lineno,
     149             :                        cstate->cur_attname,
     150             :                        attval);
     151          40 :             pfree(attval);
     152             :         }
     153         226 :         else if (cstate->cur_attname)
     154             :         {
     155             :             /* error is relevant to a particular column, value is NULL */
     156           6 :             errcontext("COPY %s, line %llu, column %s: null input",
     157             :                        cstate->cur_relname,
     158           6 :                        (unsigned long long) cstate->cur_lineno,
     159             :                        cstate->cur_attname);
     160             :         }
     161             :         else
     162             :         {
     163             :             /*
     164             :              * Error is relevant to a particular line.
     165             :              *
     166             :              * If line_buf still contains the correct line, print it.
     167             :              */
     168         220 :             if (cstate->line_buf_valid)
     169             :             {
     170             :                 char       *lineval;
     171             : 
     172         184 :                 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
     173         184 :                 errcontext("COPY %s, line %llu: \"%s\"",
     174             :                            cstate->cur_relname,
     175         184 :                            (unsigned long long) cstate->cur_lineno, lineval);
     176         184 :                 pfree(lineval);
     177             :             }
     178             :             else
     179             :             {
     180          36 :                 errcontext("COPY %s, line %llu",
     181             :                            cstate->cur_relname,
     182          36 :                            (unsigned long long) cstate->cur_lineno);
     183             :             }
     184             :         }
     185             :     }
     186             : }
     187             : 
     188             : /*
     189             :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     190             :  *
     191             :  * Returns a pstrdup'd copy of the input.
     192             :  */
     193             : char *
     194         260 : CopyLimitPrintoutLength(const char *str)
     195             : {
     196             : #define MAX_COPY_DATA_DISPLAY 100
     197             : 
     198         260 :     int         slen = strlen(str);
     199             :     int         len;
     200             :     char       *res;
     201             : 
     202             :     /* Fast path if definitely okay */
     203         260 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     204         260 :         return pstrdup(str);
     205             : 
     206             :     /* Apply encoding-dependent truncation */
     207           0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     208             : 
     209             :     /*
     210             :      * Truncate, and add "..." to show we truncated the input.
     211             :      */
     212           0 :     res = (char *) palloc(len + 4);
     213           0 :     memcpy(res, str, len);
     214           0 :     strcpy(res + len, "...");
     215             : 
     216           0 :     return res;
     217             : }
     218             : 
     219             : /*
     220             :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     221             :  * ResultRelInfo.
     222             :  */
     223             : static CopyMultiInsertBuffer *
     224        1484 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     225             : {
     226             :     CopyMultiInsertBuffer *buffer;
     227             : 
     228        1484 :     buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
     229        1484 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     230        1484 :     buffer->resultRelInfo = rri;
     231        1484 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     232        1484 :     buffer->nused = 0;
     233             : 
     234        1484 :     return buffer;
     235             : }
     236             : 
     237             : /*
     238             :  * Make a new buffer for this ResultRelInfo.
     239             :  */
     240             : static inline void
     241        1484 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     242             :                                ResultRelInfo *rri)
     243             : {
     244             :     CopyMultiInsertBuffer *buffer;
     245             : 
     246        1484 :     buffer = CopyMultiInsertBufferInit(rri);
     247             : 
     248             :     /* Setup back-link so we can easily find this buffer again */
     249        1484 :     rri->ri_CopyMultiInsertBuffer = buffer;
     250             :     /* Record that we're tracking this buffer */
     251        1484 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     252        1484 : }
     253             : 
     254             : /*
     255             :  * Initialize an already allocated CopyMultiInsertInfo.
     256             :  *
     257             :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     258             :  * for that table.
     259             :  */
     260             : static void
     261        1472 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     262             :                         CopyFromState cstate, EState *estate, CommandId mycid,
     263             :                         int ti_options)
     264             : {
     265        1472 :     miinfo->multiInsertBuffers = NIL;
     266        1472 :     miinfo->bufferedTuples = 0;
     267        1472 :     miinfo->bufferedBytes = 0;
     268        1472 :     miinfo->cstate = cstate;
     269        1472 :     miinfo->estate = estate;
     270        1472 :     miinfo->mycid = mycid;
     271        1472 :     miinfo->ti_options = ti_options;
     272             : 
     273             :     /*
     274             :      * Only setup the buffer when not dealing with a partitioned table.
     275             :      * Buffers for partitioned tables will just be setup when we need to send
     276             :      * tuples their way for the first time.
     277             :      */
     278        1472 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     279        1398 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     280        1472 : }
     281             : 
     282             : /*
     283             :  * Returns true if the buffers are full
     284             :  */
     285             : static inline bool
     286     1195610 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     287             : {
     288     1195610 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     289     1194616 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     290        1100 :         return true;
     291     1194510 :     return false;
     292             : }
     293             : 
     294             : /*
     295             :  * Returns true if we have no buffered tuples
     296             :  */
     297             : static inline bool
     298        1334 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     299             : {
     300        1334 :     return miinfo->bufferedTuples == 0;
     301             : }
     302             : 
     303             : /*
     304             :  * Write the tuples stored in 'buffer' out to the table.
     305             :  */
     306             : static inline void
     307        2400 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     308             :                            CopyMultiInsertBuffer *buffer,
     309             :                            int64 *processed)
     310             : {
     311        2400 :     CopyFromState cstate = miinfo->cstate;
     312        2400 :     EState     *estate = miinfo->estate;
     313        2400 :     int         nused = buffer->nused;
     314        2400 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     315        2400 :     TupleTableSlot **slots = buffer->slots;
     316             :     int         i;
     317             : 
     318        2400 :     if (resultRelInfo->ri_FdwRoutine)
     319             :     {
     320          14 :         int         batch_size = resultRelInfo->ri_BatchSize;
     321          14 :         int         sent = 0;
     322             : 
     323             :         Assert(buffer->bistate == NULL);
     324             : 
     325             :         /* Ensure that the FDW supports batching and it's enabled */
     326             :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     327             :         Assert(batch_size > 1);
     328             : 
     329             :         /*
     330             :          * We suppress error context information other than the relation name,
     331             :          * if one of the operations below fails.
     332             :          */
     333             :         Assert(!cstate->relname_only);
     334          14 :         cstate->relname_only = true;
     335             : 
     336          38 :         while (sent < nused)
     337             :         {
     338          26 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     339          26 :             int         inserted = size;
     340             :             TupleTableSlot **rslots;
     341             : 
     342             :             /* insert into foreign table: let the FDW do it */
     343             :             rslots =
     344          26 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     345             :                                                                      resultRelInfo,
     346          26 :                                                                      &slots[sent],
     347             :                                                                      NULL,
     348             :                                                                      &inserted);
     349             : 
     350          24 :             sent += size;
     351             : 
     352             :             /* No need to do anything if there are no inserted rows */
     353          24 :             if (inserted <= 0)
     354           4 :                 continue;
     355             : 
     356             :             /* Triggers on foreign tables should not have transition tables */
     357             :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     358             :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     359             : 
     360             :             /* Run AFTER ROW INSERT triggers */
     361          20 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     362           0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     363             :             {
     364           0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     365             : 
     366           0 :                 for (i = 0; i < inserted; i++)
     367             :                 {
     368           0 :                     TupleTableSlot *slot = rslots[i];
     369             : 
     370             :                     /*
     371             :                      * AFTER ROW Triggers might reference the tableoid column,
     372             :                      * so (re-)initialize tts_tableOid before evaluating them.
     373             :                      */
     374           0 :                     slot->tts_tableOid = relid;
     375             : 
     376           0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     377             :                                          slot, NIL,
     378             :                                          cstate->transition_capture);
     379             :                 }
     380             :             }
     381             : 
     382             :             /* Update the row counter and progress of the COPY command */
     383          20 :             *processed += inserted;
     384          20 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     385             :                                          *processed);
     386             :         }
     387             : 
     388          48 :         for (i = 0; i < nused; i++)
     389          36 :             ExecClearTuple(slots[i]);
     390             : 
     391             :         /* reset relname_only */
     392          12 :         cstate->relname_only = false;
     393             :     }
     394             :     else
     395             :     {
     396        2386 :         CommandId   mycid = miinfo->mycid;
     397        2386 :         int         ti_options = miinfo->ti_options;
     398        2386 :         bool        line_buf_valid = cstate->line_buf_valid;
     399        2386 :         uint64      save_cur_lineno = cstate->cur_lineno;
     400             :         MemoryContext oldcontext;
     401             : 
     402             :         Assert(buffer->bistate != NULL);
     403             : 
     404             :         /*
     405             :          * Print error context information correctly, if one of the operations
     406             :          * below fails.
     407             :          */
     408        2386 :         cstate->line_buf_valid = false;
     409             : 
     410             :         /*
     411             :          * table_multi_insert may leak memory, so switch to short-lived memory
     412             :          * context before calling it.
     413             :          */
     414        2386 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     415        2386 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     416             :                            slots,
     417             :                            nused,
     418             :                            mycid,
     419             :                            ti_options,
     420             :                            buffer->bistate);
     421        2386 :         MemoryContextSwitchTo(oldcontext);
     422             : 
     423     1197814 :         for (i = 0; i < nused; i++)
     424             :         {
     425             :             /*
     426             :              * If there are any indexes, update them for all the inserted
     427             :              * tuples, and run AFTER ROW INSERT triggers.
     428             :              */
     429     1195440 :             if (resultRelInfo->ri_NumIndices > 0)
     430             :             {
     431             :                 List       *recheckIndexes;
     432             : 
     433      201848 :                 cstate->cur_lineno = buffer->linenos[i];
     434             :                 recheckIndexes =
     435      201848 :                     ExecInsertIndexTuples(resultRelInfo,
     436             :                                           buffer->slots[i], estate, false,
     437             :                                           false, NULL, NIL, false);
     438      201836 :                 ExecARInsertTriggers(estate, resultRelInfo,
     439      201836 :                                      slots[i], recheckIndexes,
     440             :                                      cstate->transition_capture);
     441      201836 :                 list_free(recheckIndexes);
     442             :             }
     443             : 
     444             :             /*
     445             :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     446             :              * triggers anyway.
     447             :              */
     448      993592 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     449          78 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     450          60 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     451             :             {
     452          36 :                 cstate->cur_lineno = buffer->linenos[i];
     453          36 :                 ExecARInsertTriggers(estate, resultRelInfo,
     454          36 :                                      slots[i], NIL,
     455             :                                      cstate->transition_capture);
     456             :             }
     457             : 
     458     1195428 :             ExecClearTuple(slots[i]);
     459             :         }
     460             : 
     461             :         /* Update the row counter and progress of the COPY command */
     462        2374 :         *processed += nused;
     463        2374 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     464             :                                      *processed);
     465             : 
     466             :         /* reset cur_lineno and line_buf_valid to what they were */
     467        2374 :         cstate->line_buf_valid = line_buf_valid;
     468        2374 :         cstate->cur_lineno = save_cur_lineno;
     469             :     }
     470             : 
     471             :     /* Mark that all slots are free */
     472        2386 :     buffer->nused = 0;
     473        2386 : }
     474             : 
     475             : /*
     476             :  * Drop used slots and free member for this buffer.
     477             :  *
     478             :  * The buffer must be flushed before cleanup.
     479             :  */
     480             : static inline void
     481        1304 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     482             :                              CopyMultiInsertBuffer *buffer)
     483             : {
     484        1304 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     485             :     int         i;
     486             : 
     487             :     /* Ensure buffer was flushed */
     488             :     Assert(buffer->nused == 0);
     489             : 
     490             :     /* Remove back-link to ourself */
     491        1304 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     492             : 
     493        1304 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     494             :     {
     495             :         Assert(buffer->bistate != NULL);
     496        1292 :         FreeBulkInsertState(buffer->bistate);
     497             :     }
     498             :     else
     499             :         Assert(buffer->bistate == NULL);
     500             : 
     501             :     /* Since we only create slots on demand, just drop the non-null ones. */
     502      235704 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     503      234400 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     504             : 
     505        1304 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     506        1292 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     507             :                                  miinfo->ti_options);
     508             : 
     509        1304 :     pfree(buffer);
     510        1304 : }
     511             : 
     512             : /*
     513             :  * Write out all stored tuples in all buffers out to the tables.
     514             :  *
     515             :  * Once flushed we also trim the tracked buffers list down to size by removing
     516             :  * the buffers created earliest first.
     517             :  *
     518             :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     519             :  * used.  When cleaning up old buffers we'll never remove the one for
     520             :  * 'curr_rri'.
     521             :  */
     522             : static inline void
     523        2174 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     524             :                          int64 *processed)
     525             : {
     526             :     ListCell   *lc;
     527             : 
     528        4560 :     foreach(lc, miinfo->multiInsertBuffers)
     529             :     {
     530        2400 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     531             : 
     532        2400 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     533             :     }
     534             : 
     535        2160 :     miinfo->bufferedTuples = 0;
     536        2160 :     miinfo->bufferedBytes = 0;
     537             : 
     538             :     /*
     539             :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     540             :      * remove buffers starting with the ones we created first.  It seems less
     541             :      * likely that these older ones will be needed than the ones that were
     542             :      * just created.
     543             :      */
     544        2160 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     545             :     {
     546             :         CopyMultiInsertBuffer *buffer;
     547             : 
     548           0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     549             : 
     550             :         /*
     551             :          * We never want to remove the buffer that's currently being used, so
     552             :          * if we happen to find that then move it to the end of the list.
     553             :          */
     554           0 :         if (buffer->resultRelInfo == curr_rri)
     555             :         {
     556             :             /*
     557             :              * The code below would misbehave if we were trying to reduce the
     558             :              * list to less than two items.
     559             :              */
     560             :             StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
     561             :                              "MAX_PARTITION_BUFFERS must be >= 2");
     562             : 
     563           0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     564           0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     565           0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     566             :         }
     567             : 
     568           0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     569           0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     570             :     }
     571        2160 : }
     572             : 
     573             : /*
     574             :  * Cleanup allocated buffers and free memory
     575             :  */
     576             : static inline void
     577        1292 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     578             : {
     579             :     ListCell   *lc;
     580             : 
     581        2596 :     foreach(lc, miinfo->multiInsertBuffers)
     582        1304 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     583             : 
     584        1292 :     list_free(miinfo->multiInsertBuffers);
     585        1292 : }
     586             : 
     587             : /*
     588             :  * Get the next TupleTableSlot that the next tuple should be stored in.
     589             :  *
     590             :  * Callers must ensure that the buffer is not full.
     591             :  *
     592             :  * Note: 'miinfo' is unused but has been included for consistency with the
     593             :  * other functions in this area.
     594             :  */
     595             : static inline TupleTableSlot *
     596     1197104 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     597             :                                 ResultRelInfo *rri)
     598             : {
     599     1197104 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     600             :     int         nused;
     601             : 
     602             :     Assert(buffer != NULL);
     603             :     Assert(buffer->nused < MAX_BUFFERED_TUPLES);
     604             : 
     605     1197104 :     nused = buffer->nused;
     606             : 
     607     1197104 :     if (buffer->slots[nused] == NULL)
     608      234724 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     609     1197104 :     return buffer->slots[nused];
     610             : }
     611             : 
     612             : /*
     613             :  * Record the previously reserved TupleTableSlot that was reserved by
     614             :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     615             :  */
     616             : static inline void
     617     1195610 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     618             :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     619             : {
     620     1195610 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     621             : 
     622             :     Assert(buffer != NULL);
     623             :     Assert(slot == buffer->slots[buffer->nused]);
     624             : 
     625             :     /* Store the line number so we can properly report any errors later */
     626     1195610 :     buffer->linenos[buffer->nused] = lineno;
     627             : 
     628             :     /* Record this slot as being used */
     629     1195610 :     buffer->nused++;
     630             : 
     631             :     /* Update how many tuples are stored and their size */
     632     1195610 :     miinfo->bufferedTuples++;
     633     1195610 :     miinfo->bufferedBytes += tuplen;
     634     1195610 : }
     635             : 
     636             : /*
     637             :  * Copy FROM file to relation.
     638             :  */
     639             : uint64
     640        1650 : CopyFrom(CopyFromState cstate)
     641             : {
     642             :     ResultRelInfo *resultRelInfo;
     643             :     ResultRelInfo *target_resultRelInfo;
     644        1650 :     ResultRelInfo *prevResultRelInfo = NULL;
     645        1650 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     646             :     ModifyTableState *mtstate;
     647             :     ExprContext *econtext;
     648        1650 :     TupleTableSlot *singleslot = NULL;
     649        1650 :     MemoryContext oldcontext = CurrentMemoryContext;
     650             : 
     651        1650 :     PartitionTupleRouting *proute = NULL;
     652             :     ErrorContextCallback errcallback;
     653        1650 :     CommandId   mycid = GetCurrentCommandId(true);
     654        1650 :     int         ti_options = 0; /* start with default options for insert */
     655        1650 :     BulkInsertState bistate = NULL;
     656             :     CopyInsertMethod insertMethod;
     657        1650 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     658        1650 :     int64       processed = 0;
     659        1650 :     int64       excluded = 0;
     660             :     bool        has_before_insert_row_trig;
     661             :     bool        has_instead_insert_row_trig;
     662        1650 :     bool        leafpart_use_multi_insert = false;
     663             : 
     664             :     Assert(cstate->rel);
     665             :     Assert(list_length(cstate->range_table) == 1);
     666             : 
     667        1650 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
     668             :         Assert(cstate->escontext);
     669             : 
     670             :     /*
     671             :      * The target must be a plain, foreign, or partitioned relation, or have
     672             :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     673             :      * allowed on views, so we only hint about them in the view case.)
     674             :      */
     675        1650 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     676         160 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     677         122 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     678          18 :         !(cstate->rel->trigdesc &&
     679          12 :           cstate->rel->trigdesc->trig_insert_instead_row))
     680             :     {
     681           6 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     682           6 :             ereport(ERROR,
     683             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     684             :                      errmsg("cannot copy to view \"%s\"",
     685             :                             RelationGetRelationName(cstate->rel)),
     686             :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     687           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     688           0 :             ereport(ERROR,
     689             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     690             :                      errmsg("cannot copy to materialized view \"%s\"",
     691             :                             RelationGetRelationName(cstate->rel))));
     692           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     693           0 :             ereport(ERROR,
     694             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     695             :                      errmsg("cannot copy to sequence \"%s\"",
     696             :                             RelationGetRelationName(cstate->rel))));
     697             :         else
     698           0 :             ereport(ERROR,
     699             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     700             :                      errmsg("cannot copy to non-table relation \"%s\"",
     701             :                             RelationGetRelationName(cstate->rel))));
     702             :     }
     703             : 
     704             :     /*
     705             :      * If the target file is new-in-transaction, we assume that checking FSM
     706             :      * for free space is a waste of time.  This could possibly be wrong, but
     707             :      * it's unlikely.
     708             :      */
     709        1644 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     710        1490 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     711        1478 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     712          88 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     713             : 
     714             :     /*
     715             :      * Optimize if new relation storage was created in this subxact or one of
     716             :      * its committed children and we won't see those rows later as part of an
     717             :      * earlier scan or command. The subxact test ensures that if this subxact
     718             :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     719             :      * that the stronger test of exactly which subtransaction created it is
     720             :      * crucial for correctness of this optimization. The test for an earlier
     721             :      * scan or command tolerates false negatives. FREEZE causes other sessions
     722             :      * to see rows they would not see under MVCC, and a false negative merely
     723             :      * spreads that anomaly to the current session.
     724             :      */
     725        1644 :     if (cstate->opts.freeze)
     726             :     {
     727             :         /*
     728             :          * We currently disallow COPY FREEZE on partitioned tables.  The
     729             :          * reason for this is that we've simply not yet opened the partitions
     730             :          * to determine if the optimization can be applied to them.  We could
     731             :          * go and open them all here, but doing so may be quite a costly
     732             :          * overhead for small copies.  In any case, we may just end up routing
     733             :          * tuples to a small number of partitions.  It seems better just to
     734             :          * raise an ERROR for partitioned tables.
     735             :          */
     736          66 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     737             :         {
     738           6 :             ereport(ERROR,
     739             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     740             :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     741             :         }
     742             : 
     743             :         /*
     744             :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     745             :          * Scan-bearing queries generally create at least two registrations,
     746             :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     747             :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     748             :          * still detect ongoing catalog scans, each of which separately
     749             :          * registers the snapshot it uses.
     750             :          */
     751          60 :         InvalidateCatalogSnapshot();
     752          60 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     753           0 :             ereport(ERROR,
     754             :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     755             :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     756             : 
     757         120 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     758          60 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     759          18 :             ereport(ERROR,
     760             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     761             :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     762             : 
     763          42 :         ti_options |= TABLE_INSERT_FROZEN;
     764             :     }
     765             : 
     766             :     /*
     767             :      * We need a ResultRelInfo so we can use the regular executor's
     768             :      * index-entry-making machinery.  (There used to be a huge amount of code
     769             :      * here that basically duplicated execUtils.c ...)
     770             :      */
     771        1620 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
     772        1620 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     773        1620 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     774             : 
     775             :     /* Verify the named relation is a valid target for INSERT */
     776        1620 :     CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
     777             : 
     778        1618 :     ExecOpenIndices(resultRelInfo, false);
     779             : 
     780             :     /*
     781             :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     782             :      * foreign-table result relation(s).
     783             :      */
     784        1618 :     mtstate = makeNode(ModifyTableState);
     785        1618 :     mtstate->ps.plan = NULL;
     786        1618 :     mtstate->ps.state = estate;
     787        1618 :     mtstate->operation = CMD_INSERT;
     788        1618 :     mtstate->mt_nrels = 1;
     789        1618 :     mtstate->resultRelInfo = resultRelInfo;
     790        1618 :     mtstate->rootResultRelInfo = resultRelInfo;
     791             : 
     792        1618 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     793          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     794          36 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     795             :                                                          resultRelInfo);
     796             : 
     797             :     /*
     798             :      * Also, if the named relation is a foreign table, determine if the FDW
     799             :      * supports batch insert and determine the batch size (a FDW may support
     800             :      * batching, but it may be disabled for the server/table).
     801             :      *
     802             :      * If the FDW does not support batching, we set the batch size to 1.
     803             :      */
     804        1618 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     805          36 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     806          36 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     807          36 :         resultRelInfo->ri_BatchSize =
     808          36 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     809             :     else
     810        1582 :         resultRelInfo->ri_BatchSize = 1;
     811             : 
     812             :     Assert(resultRelInfo->ri_BatchSize >= 1);
     813             : 
     814             :     /* Prepare to catch AFTER triggers. */
     815        1618 :     AfterTriggerBeginQuery();
     816             : 
     817             :     /*
     818             :      * If there are any triggers with transition tables on the named relation,
     819             :      * we need to be prepared to capture transition tuples.
     820             :      *
     821             :      * Because partition tuple routing would like to know about whether
     822             :      * transition capture is active, we also set it in mtstate, which is
     823             :      * passed to ExecFindPartition() below.
     824             :      */
     825        1618 :     cstate->transition_capture = mtstate->mt_transition_capture =
     826        1618 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     827        1618 :                                    RelationGetRelid(cstate->rel),
     828             :                                    CMD_INSERT);
     829             : 
     830             :     /*
     831             :      * If the named relation is a partitioned table, initialize state for
     832             :      * CopyFrom tuple routing.
     833             :      */
     834        1618 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     835          98 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     836             : 
     837        1618 :     if (cstate->whereClause)
     838          18 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     839             :                                         &mtstate->ps);
     840             : 
     841             :     /*
     842             :      * It's generally more efficient to prepare a bunch of tuples for
     843             :      * insertion, and insert them in one
     844             :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     845             :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     846             :      * However, there are a number of reasons why we might not be able to do
     847             :      * this.  These are explained below.
     848             :      */
     849        1618 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     850         176 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     851          84 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     852             :     {
     853             :         /*
     854             :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
     855             :          * triggers on the table. Such triggers might query the table we're
     856             :          * inserting into and act differently if the tuples that have already
     857             :          * been processed and prepared for insertion are not there.
     858             :          */
     859         104 :         insertMethod = CIM_SINGLE;
     860             :     }
     861        1514 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
     862          28 :              resultRelInfo->ri_BatchSize == 1)
     863             :     {
     864             :         /*
     865             :          * Can't support multi-inserts to a foreign table if the FDW does not
     866             :          * support batching, or it's disabled for the server or foreign table.
     867             :          */
     868          18 :         insertMethod = CIM_SINGLE;
     869             :     }
     870        1496 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
     871          26 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
     872             :     {
     873             :         /*
     874             :          * For partitioned tables we can't support multi-inserts when there
     875             :          * are any statement level insert triggers. It might be possible to
     876             :          * allow partitioned tables with such triggers in the future, but for
     877             :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
     878             :          * statement level insert triggers are on the same relation.
     879             :          */
     880          18 :         insertMethod = CIM_SINGLE;
     881             :     }
     882        1478 :     else if (cstate->volatile_defexprs)
     883             :     {
     884             :         /*
     885             :          * Can't support multi-inserts if there are any volatile default
     886             :          * expressions in the table.  Similarly to the trigger case above,
     887             :          * such expressions may query the table we're inserting into.
     888             :          *
     889             :          * Note: It does not matter if any partitions have any volatile
     890             :          * default expressions as we use the defaults from the target of the
     891             :          * COPY command.
     892             :          */
     893           6 :         insertMethod = CIM_SINGLE;
     894             :     }
     895        1472 :     else if (contain_volatile_functions(cstate->whereClause))
     896             :     {
     897             :         /*
     898             :          * Can't support multi-inserts if there are any volatile function
     899             :          * expressions in WHERE clause.  Similarly to the trigger case above,
     900             :          * such expressions may query the table we're inserting into.
     901             :          *
     902             :          * Note: the whereClause was already preprocessed in DoCopy(), so it's
     903             :          * okay to use contain_volatile_functions() directly.
     904             :          */
     905           0 :         insertMethod = CIM_SINGLE;
     906             :     }
     907             :     else
     908             :     {
     909             :         /*
     910             :          * For partitioned tables, we may still be able to perform bulk
     911             :          * inserts.  However, the possibility of this depends on which types
     912             :          * of triggers exist on the partition.  We must disable bulk inserts
     913             :          * if the partition is a foreign table that can't use batching or it
     914             :          * has any before row insert or insert instead triggers (same as we
     915             :          * checked above for the parent table).  Since the partition's
     916             :          * resultRelInfos are initialized only when we actually need to insert
     917             :          * the first tuple into them, we must have the intermediate insert
     918             :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
     919             :          * determine if we can use bulk-inserts for the partition being
     920             :          * inserted into.
     921             :          */
     922        1472 :         if (proute)
     923          74 :             insertMethod = CIM_MULTI_CONDITIONAL;
     924             :         else
     925        1398 :             insertMethod = CIM_MULTI;
     926             : 
     927        1472 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
     928             :                                 estate, mycid, ti_options);
     929             :     }
     930             : 
     931             :     /*
     932             :      * If not using batch mode (which allocates slots as needed) set up a
     933             :      * tuple slot too. When inserting into a partitioned table, we also need
     934             :      * one, even if we might batch insert, to read the tuple in the root
     935             :      * partition's form.
     936             :      */
     937        1618 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
     938             :     {
     939         220 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
     940             :                                        &estate->es_tupleTable);
     941         220 :         bistate = GetBulkInsertState();
     942             :     }
     943             : 
     944        1794 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     945         176 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
     946             : 
     947        1794 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     948         176 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
     949             : 
     950             :     /*
     951             :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
     952             :      * should do this for COPY, since it's not really an "INSERT" statement as
     953             :      * such. However, executing these triggers maintains consistency with the
     954             :      * EACH ROW triggers that we already fire on COPY.
     955             :      */
     956        1618 :     ExecBSInsertTriggers(estate, resultRelInfo);
     957             : 
     958        1618 :     econtext = GetPerTupleExprContext(estate);
     959             : 
     960             :     /* Set up callback to identify error line number */
     961        1618 :     errcallback.callback = CopyFromErrorCallback;
     962        1618 :     errcallback.arg = cstate;
     963        1618 :     errcallback.previous = error_context_stack;
     964        1618 :     error_context_stack = &errcallback;
     965             : 
     966             :     for (;;)
     967     1256106 :     {
     968             :         TupleTableSlot *myslot;
     969             :         bool        skip_tuple;
     970             : 
     971     1257724 :         CHECK_FOR_INTERRUPTS();
     972             : 
     973             :         /*
     974             :          * Reset the per-tuple exprcontext. We do this after every tuple, to
     975             :          * clean-up after expression evaluations etc.
     976             :          */
     977     1257724 :         ResetPerTupleExprContext(estate);
     978             : 
     979             :         /* select slot to (initially) load row into */
     980     1257724 :         if (insertMethod == CIM_SINGLE || proute)
     981             :         {
     982      274872 :             myslot = singleslot;
     983      274872 :             Assert(myslot != NULL);
     984             :         }
     985             :         else
     986             :         {
     987             :             Assert(resultRelInfo == target_resultRelInfo);
     988             :             Assert(insertMethod == CIM_MULTI);
     989             : 
     990      982852 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
     991             :                                                      resultRelInfo);
     992             :         }
     993             : 
     994             :         /*
     995             :          * Switch to per-tuple context before calling NextCopyFrom, which does
     996             :          * evaluate default expressions etc. and requires per-tuple context.
     997             :          */
     998     1257724 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     999             : 
    1000     1257724 :         ExecClearTuple(myslot);
    1001             : 
    1002             :         /* Directly store the values/nulls array in the slot */
    1003     1257724 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
    1004        1426 :             break;
    1005             : 
    1006     1256146 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
    1007         144 :             cstate->escontext->error_occurred)
    1008             :         {
    1009             :             /*
    1010             :              * Soft error occurred, skip this tuple and just make
    1011             :              * ErrorSaveContext ready for the next NextCopyFrom. Since we
    1012             :              * don't set details_wanted and error_data is not to be filled,
    1013             :              * just resetting error_occurred is enough.
    1014             :              */
    1015          96 :             cstate->escontext->error_occurred = false;
    1016             : 
    1017             :             /* Report that this tuple was skipped by the ON_ERROR clause */
    1018          96 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
    1019          96 :                                          cstate->num_errors);
    1020             : 
    1021          96 :             if (cstate->opts.reject_limit > 0 &&
    1022          48 :                 cstate->num_errors > cstate->opts.reject_limit)
    1023           6 :                 ereport(ERROR,
    1024             :                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
    1025             :                          errmsg("skipped more than REJECT_LIMIT (%lld) rows due to data type incompatibility",
    1026             :                                 (long long) cstate->opts.reject_limit)));
    1027             : 
    1028             :             /* Repeat NextCopyFrom() until no soft error occurs */
    1029          90 :             continue;
    1030             :         }
    1031             : 
    1032     1256050 :         ExecStoreVirtualTuple(myslot);
    1033             : 
    1034             :         /*
    1035             :          * Constraints and where clause might reference the tableoid column,
    1036             :          * so (re-)initialize tts_tableOid before evaluating them.
    1037             :          */
    1038     1256050 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
    1039             : 
    1040             :         /* Triggers and stuff need to be invoked in query context. */
    1041     1256050 :         MemoryContextSwitchTo(oldcontext);
    1042             : 
    1043     1256050 :         if (cstate->whereClause)
    1044             :         {
    1045          66 :             econtext->ecxt_scantuple = myslot;
    1046             :             /* Skip items that don't match COPY's WHERE clause */
    1047          66 :             if (!ExecQual(cstate->qualexpr, econtext))
    1048             :             {
    1049             :                 /*
    1050             :                  * Report that this tuple was filtered out by the WHERE
    1051             :                  * clause.
    1052             :                  */
    1053          36 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1054             :                                              ++excluded);
    1055          36 :                 continue;
    1056             :             }
    1057             :         }
    1058             : 
    1059             :         /* Determine the partition to insert the tuple into */
    1060     1256014 :         if (proute)
    1061             :         {
    1062             :             TupleConversionMap *map;
    1063             : 
    1064             :             /*
    1065             :              * Attempt to find a partition suitable for this tuple.
    1066             :              * ExecFindPartition() will raise an error if none can be found or
    1067             :              * if the found partition is not suitable for INSERTs.
    1068             :              */
    1069      274390 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1070             :                                               proute, myslot, estate);
    1071             : 
    1072      274388 :             if (prevResultRelInfo != resultRelInfo)
    1073             :             {
    1074             :                 /* Determine which triggers exist on this partition */
    1075      161566 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1076          54 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1077             : 
    1078      161566 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1079          54 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1080             : 
    1081             :                 /*
    1082             :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1083             :                  * OF triggers, or if the partition is a foreign table that
    1084             :                  * can't use batching.
    1085             :                  */
    1086      262970 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1087      101458 :                     !has_before_insert_row_trig &&
    1088      364404 :                     !has_instead_insert_row_trig &&
    1089      101434 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1090          12 :                      resultRelInfo->ri_BatchSize > 1);
    1091             : 
    1092             :                 /* Set the multi-insert buffer to use for this partition. */
    1093      161512 :                 if (leafpart_use_multi_insert)
    1094             :                 {
    1095      101430 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1096          86 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1097             :                                                        resultRelInfo);
    1098             :                 }
    1099       60082 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1100          28 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1101             :                 {
    1102             :                     /*
    1103             :                      * Flush pending inserts if this partition can't use
    1104             :                      * batching, so rows are visible to triggers etc.
    1105             :                      */
    1106           0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1107             :                                              resultRelInfo,
    1108             :                                              &processed);
    1109             :                 }
    1110             : 
    1111      161512 :                 if (bistate != NULL)
    1112      161512 :                     ReleaseBulkInsertStatePin(bistate);
    1113      161512 :                 prevResultRelInfo = resultRelInfo;
    1114             :             }
    1115             : 
    1116             :             /*
    1117             :              * If we're capturing transition tuples, we might need to convert
    1118             :              * from the partition rowtype to root rowtype. But if there are no
    1119             :              * BEFORE triggers on the partition that could change the tuple,
    1120             :              * we can just remember the original unconverted tuple to avoid a
    1121             :              * needless round trip conversion.
    1122             :              */
    1123      274388 :             if (cstate->transition_capture != NULL)
    1124          54 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1125          54 :                     !has_before_insert_row_trig ? myslot : NULL;
    1126             : 
    1127             :             /*
    1128             :              * We might need to convert from the root rowtype to the partition
    1129             :              * rowtype.
    1130             :              */
    1131      274388 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1132      274388 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1133             :             {
    1134             :                 /* non batch insert */
    1135       60136 :                 if (map != NULL)
    1136             :                 {
    1137             :                     TupleTableSlot *new_slot;
    1138             : 
    1139         110 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1140         110 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1141             :                 }
    1142             :             }
    1143             :             else
    1144             :             {
    1145             :                 /*
    1146             :                  * Prepare to queue up tuple for later batch insert into
    1147             :                  * current partition.
    1148             :                  */
    1149             :                 TupleTableSlot *batchslot;
    1150             : 
    1151             :                 /* no other path available for partitioned table */
    1152             :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1153             : 
    1154      214252 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1155             :                                                             resultRelInfo);
    1156             : 
    1157      214252 :                 if (map != NULL)
    1158       12200 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1159             :                                                    batchslot);
    1160             :                 else
    1161             :                 {
    1162             :                     /*
    1163             :                      * This looks more expensive than it is (Believe me, I
    1164             :                      * optimized it away. Twice.). The input is in virtual
    1165             :                      * form, and we'll materialize the slot below - for most
    1166             :                      * slot types the copy performs the work materialization
    1167             :                      * would later require anyway.
    1168             :                      */
    1169      202052 :                     ExecCopySlot(batchslot, myslot);
    1170      202052 :                     myslot = batchslot;
    1171             :                 }
    1172             :             }
    1173             : 
    1174             :             /* ensure that triggers etc see the right relation  */
    1175      274388 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1176             :         }
    1177             : 
    1178     1256012 :         skip_tuple = false;
    1179             : 
    1180             :         /* BEFORE ROW INSERT Triggers */
    1181     1256012 :         if (has_before_insert_row_trig)
    1182             :         {
    1183         274 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1184          16 :                 skip_tuple = true;  /* "do nothing" */
    1185             :         }
    1186             : 
    1187     1256012 :         if (!skip_tuple)
    1188             :         {
    1189             :             /*
    1190             :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1191             :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1192             :              * table or foreign table.
    1193             :              */
    1194     1255996 :             if (has_instead_insert_row_trig)
    1195             :             {
    1196          12 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1197             :             }
    1198             :             else
    1199             :             {
    1200             :                 /* Compute stored generated columns */
    1201     1255984 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1202      461866 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1203          34 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1204             :                                                CMD_INSERT);
    1205             : 
    1206             :                 /*
    1207             :                  * If the target is a plain table, check the constraints of
    1208             :                  * the tuple.
    1209             :                  */
    1210     1255984 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1211     1255896 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1212      461830 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1213             : 
    1214             :                 /*
    1215             :                  * Also check the tuple against the partition constraint, if
    1216             :                  * there is one; except that if we got here via tuple-routing,
    1217             :                  * we don't need to if there's no BR trigger defined on the
    1218             :                  * partition.
    1219             :                  */
    1220     1255954 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1221      274376 :                     (proute == NULL || has_before_insert_row_trig))
    1222        2308 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1223             : 
    1224             :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1225     1255954 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1226             :                 {
    1227             :                     /*
    1228             :                      * The slot previously might point into the per-tuple
    1229             :                      * context. For batching it needs to be longer lived.
    1230             :                      */
    1231     1195610 :                     ExecMaterializeSlot(myslot);
    1232             : 
    1233             :                     /* Add this tuple to the tuple buffer */
    1234     1195610 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1235             :                                              resultRelInfo, myslot,
    1236             :                                              cstate->line_buf.len,
    1237             :                                              cstate->cur_lineno);
    1238             : 
    1239             :                     /*
    1240             :                      * If enough inserts have queued up, then flush all
    1241             :                      * buffers out to their tables.
    1242             :                      */
    1243     1195610 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1244        1100 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1245             :                                                  resultRelInfo,
    1246             :                                                  &processed);
    1247             : 
    1248             :                     /*
    1249             :                      * We delay updating the row counter and progress of the
    1250             :                      * COPY command until after writing the tuples stored in
    1251             :                      * the buffer out to the table, as in single insert mode.
    1252             :                      * See CopyMultiInsertBufferFlush().
    1253             :                      */
    1254     1195610 :                     continue;   /* next tuple please */
    1255             :                 }
    1256             :                 else
    1257             :                 {
    1258       60344 :                     List       *recheckIndexes = NIL;
    1259             : 
    1260             :                     /* OK, store the tuple */
    1261       60344 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1262             :                     {
    1263          50 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1264             :                                                                                  resultRelInfo,
    1265             :                                                                                  myslot,
    1266             :                                                                                  NULL);
    1267             : 
    1268          48 :                         if (myslot == NULL) /* "do nothing" */
    1269           4 :                             continue;   /* next tuple please */
    1270             : 
    1271             :                         /*
    1272             :                          * AFTER ROW Triggers might reference the tableoid
    1273             :                          * column, so (re-)initialize tts_tableOid before
    1274             :                          * evaluating them.
    1275             :                          */
    1276          44 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1277             :                     }
    1278             :                     else
    1279             :                     {
    1280             :                         /* OK, store the tuple and create index entries for it */
    1281       60294 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1282             :                                            myslot, mycid, ti_options, bistate);
    1283             : 
    1284       60294 :                         if (resultRelInfo->ri_NumIndices > 0)
    1285           0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1286             :                                                                    myslot,
    1287             :                                                                    estate,
    1288             :                                                                    false,
    1289             :                                                                    false,
    1290             :                                                                    NULL,
    1291             :                                                                    NIL,
    1292             :                                                                    false);
    1293             :                     }
    1294             : 
    1295             :                     /* AFTER ROW INSERT Triggers */
    1296       60338 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1297             :                                          recheckIndexes, cstate->transition_capture);
    1298             : 
    1299       60338 :                     list_free(recheckIndexes);
    1300             :                 }
    1301             :             }
    1302             : 
    1303             :             /*
    1304             :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1305             :              * or FDW; this is the same definition used by nodeModifyTable.c
    1306             :              * for counting tuples inserted by an INSERT command.  Update
    1307             :              * progress of the COPY command as well.
    1308             :              */
    1309       60350 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1310             :                                          ++processed);
    1311             :         }
    1312             :     }
    1313             : 
    1314             :     /* Flush any remaining buffered tuples */
    1315        1426 :     if (insertMethod != CIM_SINGLE)
    1316             :     {
    1317        1306 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1318        1074 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1319             :     }
    1320             : 
    1321             :     /* Done, clean up */
    1322        1412 :     error_context_stack = errcallback.previous;
    1323             : 
    1324        1412 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
    1325          24 :         cstate->num_errors > 0 &&
    1326          24 :         cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
    1327          18 :         ereport(NOTICE,
    1328             :                 errmsg_plural("%llu row was skipped due to data type incompatibility",
    1329             :                               "%llu rows were skipped due to data type incompatibility",
    1330             :                               (unsigned long long) cstate->num_errors,
    1331             :                               (unsigned long long) cstate->num_errors));
    1332             : 
    1333        1412 :     if (bistate != NULL)
    1334         192 :         FreeBulkInsertState(bistate);
    1335             : 
    1336        1412 :     MemoryContextSwitchTo(oldcontext);
    1337             : 
    1338             :     /* Execute AFTER STATEMENT insertion triggers */
    1339        1412 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1340             : 
    1341             :     /* Handle queued AFTER triggers */
    1342        1412 :     AfterTriggerEndQuery(estate);
    1343             : 
    1344        1412 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1345             : 
    1346             :     /* Allow the FDW to shut down */
    1347        1412 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1348          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1349          32 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1350             :                                                               target_resultRelInfo);
    1351             : 
    1352             :     /* Tear down the multi-insert buffer data */
    1353        1412 :     if (insertMethod != CIM_SINGLE)
    1354        1292 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1355             : 
    1356             :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1357        1412 :     if (proute)
    1358          96 :         ExecCleanupTupleRouting(mtstate, proute);
    1359             : 
    1360             :     /* Close the result relations, including any trigger target relations */
    1361        1412 :     ExecCloseResultRelations(estate);
    1362        1412 :     ExecCloseRangeTableRelations(estate);
    1363             : 
    1364        1412 :     FreeExecutorState(estate);
    1365             : 
    1366        1412 :     return processed;
    1367             : }
    1368             : 
    1369             : /*
    1370             :  * Setup to read tuples from a file for COPY FROM.
    1371             :  *
    1372             :  * 'rel': Used as a template for the tuples
    1373             :  * 'whereClause': WHERE clause from the COPY FROM command
    1374             :  * 'filename': Name of server-local file to read, NULL for STDIN
    1375             :  * 'is_program': true if 'filename' is program to execute
    1376             :  * 'data_source_cb': callback that provides the input data
    1377             :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1378             :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1379             :  *
    1380             :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1381             :  */
    1382             : CopyFromState
    1383        1976 : BeginCopyFrom(ParseState *pstate,
    1384             :               Relation rel,
    1385             :               Node *whereClause,
    1386             :               const char *filename,
    1387             :               bool is_program,
    1388             :               copy_data_source_cb data_source_cb,
    1389             :               List *attnamelist,
    1390             :               List *options)
    1391             : {
    1392             :     CopyFromState cstate;
    1393        1976 :     bool        pipe = (filename == NULL);
    1394             :     TupleDesc   tupDesc;
    1395             :     AttrNumber  num_phys_attrs,
    1396             :                 num_defaults;
    1397             :     FmgrInfo   *in_functions;
    1398             :     Oid        *typioparams;
    1399             :     Oid         in_func_oid;
    1400             :     int        *defmap;
    1401             :     ExprState **defexprs;
    1402             :     MemoryContext oldcontext;
    1403             :     bool        volatile_defexprs;
    1404        1976 :     const int   progress_cols[] = {
    1405             :         PROGRESS_COPY_COMMAND,
    1406             :         PROGRESS_COPY_TYPE,
    1407             :         PROGRESS_COPY_BYTES_TOTAL
    1408             :     };
    1409        1976 :     int64       progress_vals[] = {
    1410             :         PROGRESS_COPY_COMMAND_FROM,
    1411             :         0,
    1412             :         0
    1413             :     };
    1414             : 
    1415             :     /* Allocate workspace and zero all fields */
    1416        1976 :     cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
    1417             : 
    1418             :     /*
    1419             :      * We allocate everything used by a cstate in a new memory context. This
    1420             :      * avoids memory leaks during repeated use of COPY in a query.
    1421             :      */
    1422        1976 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1423             :                                                 "COPY",
    1424             :                                                 ALLOCSET_DEFAULT_SIZES);
    1425             : 
    1426        1976 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1427             : 
    1428             :     /* Extract options from the statement node tree */
    1429        1976 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1430             : 
    1431             :     /* Process the target relation */
    1432        1746 :     cstate->rel = rel;
    1433             : 
    1434        1746 :     tupDesc = RelationGetDescr(cstate->rel);
    1435             : 
    1436             :     /* process common options or initialization */
    1437             : 
    1438             :     /* Generate or convert list of attributes to process */
    1439        1746 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1440             : 
    1441        1746 :     num_phys_attrs = tupDesc->natts;
    1442             : 
    1443             :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1444        1746 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1445        1746 :     if (cstate->opts.force_notnull_all)
    1446          12 :         MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
    1447        1734 :     else if (cstate->opts.force_notnull)
    1448             :     {
    1449             :         List       *attnums;
    1450             :         ListCell   *cur;
    1451             : 
    1452          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1453             : 
    1454          52 :         foreach(cur, attnums)
    1455             :         {
    1456          32 :             int         attnum = lfirst_int(cur);
    1457          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1458             : 
    1459          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1460           6 :                 ereport(ERROR,
    1461             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1462             :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1463             :                          errmsg("%s column \"%s\" not referenced by COPY",
    1464             :                                 "FORCE_NOT_NULL", NameStr(attr->attname))));
    1465          26 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1466             :         }
    1467             :     }
    1468             : 
    1469             :     /* Set up soft error handler for ON_ERROR */
    1470        1740 :     if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
    1471             :     {
    1472          58 :         cstate->escontext = makeNode(ErrorSaveContext);
    1473          58 :         cstate->escontext->type = T_ErrorSaveContext;
    1474          58 :         cstate->escontext->error_occurred = false;
    1475             : 
    1476             :         /*
    1477             :          * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
    1478             :          * options later
    1479             :          */
    1480          58 :         if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
    1481          58 :             cstate->escontext->details_wanted = false;
    1482             :     }
    1483             :     else
    1484        1682 :         cstate->escontext = NULL;
    1485             : 
    1486             :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1487        1740 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1488        1740 :     if (cstate->opts.force_null_all)
    1489          12 :         MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
    1490        1728 :     else if (cstate->opts.force_null)
    1491             :     {
    1492             :         List       *attnums;
    1493             :         ListCell   *cur;
    1494             : 
    1495          26 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1496             : 
    1497          52 :         foreach(cur, attnums)
    1498             :         {
    1499          32 :             int         attnum = lfirst_int(cur);
    1500          32 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1501             : 
    1502          32 :             if (!list_member_int(cstate->attnumlist, attnum))
    1503           6 :                 ereport(ERROR,
    1504             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1505             :                 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
    1506             :                          errmsg("%s column \"%s\" not referenced by COPY",
    1507             :                                 "FORCE_NULL", NameStr(attr->attname))));
    1508          26 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1509             :         }
    1510             :     }
    1511             : 
    1512             :     /* Convert convert_selectively name list to per-column flags */
    1513        1734 :     if (cstate->opts.convert_selectively)
    1514             :     {
    1515             :         List       *attnums;
    1516             :         ListCell   *cur;
    1517             : 
    1518           4 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1519             : 
    1520           4 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1521             : 
    1522           8 :         foreach(cur, attnums)
    1523             :         {
    1524           4 :             int         attnum = lfirst_int(cur);
    1525           4 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1526             : 
    1527           4 :             if (!list_member_int(cstate->attnumlist, attnum))
    1528           0 :                 ereport(ERROR,
    1529             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1530             :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1531             :                                          NameStr(attr->attname))));
    1532           4 :             cstate->convert_select_flags[attnum - 1] = true;
    1533             :         }
    1534             :     }
    1535             : 
    1536             :     /* Use client encoding when ENCODING option is not specified. */
    1537        1734 :     if (cstate->opts.file_encoding < 0)
    1538        1716 :         cstate->file_encoding = pg_get_client_encoding();
    1539             :     else
    1540          18 :         cstate->file_encoding = cstate->opts.file_encoding;
    1541             : 
    1542             :     /*
    1543             :      * Look up encoding conversion function.
    1544             :      */
    1545        1734 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1546          54 :         cstate->file_encoding == PG_SQL_ASCII ||
    1547          24 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1548             :     {
    1549        1710 :         cstate->need_transcoding = false;
    1550             :     }
    1551             :     else
    1552             :     {
    1553          24 :         cstate->need_transcoding = true;
    1554          24 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1555             :                                                             GetDatabaseEncoding());
    1556          24 :         if (!OidIsValid(cstate->conversion_proc))
    1557           0 :             ereport(ERROR,
    1558             :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
    1559             :                      errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
    1560             :                             pg_encoding_to_char(cstate->file_encoding),
    1561             :                             pg_encoding_to_char(GetDatabaseEncoding()))));
    1562             :     }
    1563             : 
    1564        1734 :     cstate->copy_src = COPY_FILE;    /* default */
    1565             : 
    1566        1734 :     cstate->whereClause = whereClause;
    1567             : 
    1568             :     /* Initialize state variables */
    1569        1734 :     cstate->eol_type = EOL_UNKNOWN;
    1570        1734 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1571        1734 :     cstate->cur_lineno = 0;
    1572        1734 :     cstate->cur_attname = NULL;
    1573        1734 :     cstate->cur_attval = NULL;
    1574        1734 :     cstate->relname_only = false;
    1575             : 
    1576             :     /*
    1577             :      * Allocate buffers for the input pipeline.
    1578             :      *
    1579             :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1580             :      * input_buf and line_buf only in text mode.
    1581             :      */
    1582        1734 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1583        1734 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1584        1734 :     cstate->raw_reached_eof = false;
    1585             : 
    1586        1734 :     if (!cstate->opts.binary)
    1587             :     {
    1588             :         /*
    1589             :          * If encoding conversion is needed, we need another buffer to hold
    1590             :          * the converted input data.  Otherwise, we can just point input_buf
    1591             :          * to the same buffer as raw_buf.
    1592             :          */
    1593        1718 :         if (cstate->need_transcoding)
    1594             :         {
    1595          24 :             cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
    1596          24 :             cstate->input_buf_index = cstate->input_buf_len = 0;
    1597             :         }
    1598             :         else
    1599        1694 :             cstate->input_buf = cstate->raw_buf;
    1600        1718 :         cstate->input_reached_eof = false;
    1601             : 
    1602        1718 :         initStringInfo(&cstate->line_buf);
    1603             :     }
    1604             : 
    1605        1734 :     initStringInfo(&cstate->attribute_buf);
    1606             : 
    1607             :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1608        1734 :     if (pstate)
    1609             :     {
    1610        1664 :         cstate->range_table = pstate->p_rtable;
    1611        1664 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1612             :     }
    1613             : 
    1614        1734 :     num_defaults = 0;
    1615        1734 :     volatile_defexprs = false;
    1616             : 
    1617             :     /*
    1618             :      * Pick up the required catalog information for each attribute in the
    1619             :      * relation, including the input function, the element type (to pass to
    1620             :      * the input function), and info about defaults and constraints. (Which
    1621             :      * input function we use depends on text/binary format choice.)
    1622             :      */
    1623        1734 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1624        1734 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1625        1734 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1626        1734 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1627             : 
    1628        6736 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1629             :     {
    1630        5016 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1631             : 
    1632             :         /* We don't need info for dropped attributes */
    1633        5016 :         if (att->attisdropped)
    1634         124 :             continue;
    1635             : 
    1636             :         /* Fetch the input function and typioparam info */
    1637        4892 :         if (cstate->opts.binary)
    1638          62 :             getTypeBinaryInputInfo(att->atttypid,
    1639          62 :                                    &in_func_oid, &typioparams[attnum - 1]);
    1640             :         else
    1641        4830 :             getTypeInputInfo(att->atttypid,
    1642        4830 :                              &in_func_oid, &typioparams[attnum - 1]);
    1643        4890 :         fmgr_info(in_func_oid, &in_functions[attnum - 1]);
    1644             : 
    1645             :         /* Get default info if available */
    1646        4890 :         defexprs[attnum - 1] = NULL;
    1647             : 
    1648             :         /*
    1649             :          * We only need the default values for columns that do not appear in
    1650             :          * the column list, unless the DEFAULT option was given. We never need
    1651             :          * default values for generated columns.
    1652             :          */
    1653        4890 :         if ((cstate->opts.default_print != NULL ||
    1654        4764 :              !list_member_int(cstate->attnumlist, attnum)) &&
    1655         652 :             !att->attgenerated)
    1656             :         {
    1657         634 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1658             :                                                                 attnum);
    1659             : 
    1660         634 :             if (defexpr != NULL)
    1661             :             {
    1662             :                 /* Run the expression through planner */
    1663         264 :                 defexpr = expression_planner(defexpr);
    1664             : 
    1665             :                 /* Initialize executable expression in copycontext */
    1666         252 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1667             : 
    1668             :                 /* if NOT copied from input */
    1669             :                 /* use default value if one exists */
    1670         252 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1671             :                 {
    1672         172 :                     defmap[num_defaults] = attnum - 1;
    1673         172 :                     num_defaults++;
    1674             :                 }
    1675             : 
    1676             :                 /*
    1677             :                  * If a default expression looks at the table being loaded,
    1678             :                  * then it could give the wrong answer when using
    1679             :                  * multi-insert. Since database access can be dynamic this is
    1680             :                  * hard to test for exactly, so we use the much wider test of
    1681             :                  * whether the default expression is volatile. We allow for
    1682             :                  * the special case of when the default expression is the
    1683             :                  * nextval() of a sequence which in this specific case is
    1684             :                  * known to be safe for use with the multi-insert
    1685             :                  * optimization. Hence we use this special case function
    1686             :                  * checker rather than the standard check for
    1687             :                  * contain_volatile_functions().  Note also that we already
    1688             :                  * ran the expression through expression_planner().
    1689             :                  */
    1690         252 :                 if (!volatile_defexprs)
    1691         252 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1692             :             }
    1693             :         }
    1694             :     }
    1695             : 
    1696        1720 :     cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
    1697             : 
    1698             :     /* initialize progress */
    1699        1720 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1700        1720 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1701        1720 :     cstate->bytes_processed = 0;
    1702             : 
    1703             :     /* We keep those variables in cstate. */
    1704        1720 :     cstate->in_functions = in_functions;
    1705        1720 :     cstate->typioparams = typioparams;
    1706        1720 :     cstate->defmap = defmap;
    1707        1720 :     cstate->defexprs = defexprs;
    1708        1720 :     cstate->volatile_defexprs = volatile_defexprs;
    1709        1720 :     cstate->num_defaults = num_defaults;
    1710        1720 :     cstate->is_program = is_program;
    1711             : 
    1712        1720 :     if (data_source_cb)
    1713             :     {
    1714         362 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1715         362 :         cstate->copy_src = COPY_CALLBACK;
    1716         362 :         cstate->data_source_cb = data_source_cb;
    1717             :     }
    1718        1358 :     else if (pipe)
    1719             :     {
    1720         920 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1721             :         Assert(!is_program);    /* the grammar does not allow this */
    1722         920 :         if (whereToSendOutput == DestRemote)
    1723         920 :             ReceiveCopyBegin(cstate);
    1724             :         else
    1725           0 :             cstate->copy_file = stdin;
    1726             :     }
    1727             :     else
    1728             :     {
    1729         438 :         cstate->filename = pstrdup(filename);
    1730             : 
    1731         438 :         if (cstate->is_program)
    1732             :         {
    1733           0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1734           0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1735           0 :             if (cstate->copy_file == NULL)
    1736           0 :                 ereport(ERROR,
    1737             :                         (errcode_for_file_access(),
    1738             :                          errmsg("could not execute command \"%s\": %m",
    1739             :                                 cstate->filename)));
    1740             :         }
    1741             :         else
    1742             :         {
    1743             :             struct stat st;
    1744             : 
    1745         438 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1746         438 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1747         438 :             if (cstate->copy_file == NULL)
    1748             :             {
    1749             :                 /* copy errno because ereport subfunctions might change it */
    1750           0 :                 int         save_errno = errno;
    1751             : 
    1752           0 :                 ereport(ERROR,
    1753             :                         (errcode_for_file_access(),
    1754             :                          errmsg("could not open file \"%s\" for reading: %m",
    1755             :                                 cstate->filename),
    1756             :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1757             :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1758             :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1759             :             }
    1760             : 
    1761         438 :             if (fstat(fileno(cstate->copy_file), &st))
    1762           0 :                 ereport(ERROR,
    1763             :                         (errcode_for_file_access(),
    1764             :                          errmsg("could not stat file \"%s\": %m",
    1765             :                                 cstate->filename)));
    1766             : 
    1767         438 :             if (S_ISDIR(st.st_mode))
    1768           0 :                 ereport(ERROR,
    1769             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1770             :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1771             : 
    1772         438 :             progress_vals[2] = st.st_size;
    1773             :         }
    1774             :     }
    1775             : 
    1776        1720 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1777             : 
    1778        1720 :     if (cstate->opts.binary)
    1779             :     {
    1780             :         /* Read and verify binary header */
    1781          14 :         ReceiveCopyBinaryHeader(cstate);
    1782             :     }
    1783             : 
    1784             :     /* create workspace for CopyReadAttributes results */
    1785        1720 :     if (!cstate->opts.binary)
    1786             :     {
    1787        1706 :         AttrNumber  attr_count = list_length(cstate->attnumlist);
    1788             : 
    1789        1706 :         cstate->max_fields = attr_count;
    1790        1706 :         cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
    1791             :     }
    1792             : 
    1793        1720 :     MemoryContextSwitchTo(oldcontext);
    1794             : 
    1795        1720 :     return cstate;
    1796             : }
    1797             : 
    1798             : /*
    1799             :  * Clean up storage and release resources for COPY FROM.
    1800             :  */
    1801             : void
    1802        1128 : EndCopyFrom(CopyFromState cstate)
    1803             : {
    1804             :     /* No COPY FROM related resources except memory. */
    1805        1128 :     if (cstate->is_program)
    1806             :     {
    1807           0 :         ClosePipeFromProgram(cstate);
    1808             :     }
    1809             :     else
    1810             :     {
    1811        1128 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1812           0 :             ereport(ERROR,
    1813             :                     (errcode_for_file_access(),
    1814             :                      errmsg("could not close file \"%s\": %m",
    1815             :                             cstate->filename)));
    1816             :     }
    1817             : 
    1818        1128 :     pgstat_progress_end_command();
    1819             : 
    1820        1128 :     MemoryContextDelete(cstate->copycontext);
    1821        1128 :     pfree(cstate);
    1822        1128 : }
    1823             : 
    1824             : /*
    1825             :  * Closes the pipe from an external program, checking the pclose() return code.
    1826             :  */
    1827             : static void
    1828           0 : ClosePipeFromProgram(CopyFromState cstate)
    1829             : {
    1830             :     int         pclose_rc;
    1831             : 
    1832             :     Assert(cstate->is_program);
    1833             : 
    1834           0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1835           0 :     if (pclose_rc == -1)
    1836           0 :         ereport(ERROR,
    1837             :                 (errcode_for_file_access(),
    1838             :                  errmsg("could not close pipe to external command: %m")));
    1839           0 :     else if (pclose_rc != 0)
    1840             :     {
    1841             :         /*
    1842             :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1843             :          * expectable for the called program to fail with SIGPIPE, and we
    1844             :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1845             :          * problem.
    1846             :          */
    1847           0 :         if (!cstate->raw_reached_eof &&
    1848           0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1849           0 :             return;
    1850             : 
    1851           0 :         ereport(ERROR,
    1852             :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1853             :                  errmsg("program \"%s\" failed",
    1854             :                         cstate->filename),
    1855             :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1856             :     }
    1857             : }

Generated by: LCOV version 1.14