Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/tableam.h"
29 : #include "access/xact.h"
30 : #include "catalog/namespace.h"
31 : #include "commands/copyapi.h"
32 : #include "commands/copyfrom_internal.h"
33 : #include "commands/progress.h"
34 : #include "commands/trigger.h"
35 : #include "executor/execPartition.h"
36 : #include "executor/executor.h"
37 : #include "executor/nodeModifyTable.h"
38 : #include "executor/tuptable.h"
39 : #include "foreign/fdwapi.h"
40 : #include "mb/pg_wchar.h"
41 : #include "miscadmin.h"
42 : #include "nodes/miscnodes.h"
43 : #include "optimizer/optimizer.h"
44 : #include "pgstat.h"
45 : #include "rewrite/rewriteHandler.h"
46 : #include "storage/fd.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/lsyscache.h"
49 : #include "utils/memutils.h"
50 : #include "utils/portal.h"
51 : #include "utils/rel.h"
52 : #include "utils/snapmgr.h"
53 :
54 : /*
55 : * No more than this many tuples per CopyMultiInsertBuffer
56 : *
57 : * Caution: Don't make this too big, as we could end up with this many
58 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
59 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
60 : * memory requirements during copies into partitioned tables with a large
61 : * number of partitions.
62 : */
63 : #define MAX_BUFFERED_TUPLES 1000
64 :
65 : /*
66 : * Flush buffers if there are >= this many bytes, as counted by the input
67 : * size, of tuples stored.
68 : */
69 : #define MAX_BUFFERED_BYTES 65535
70 :
71 : /*
72 : * Trim the list of buffers back down to this number after flushing. This
73 : * must be >= 2.
74 : */
75 : #define MAX_PARTITION_BUFFERS 32
76 :
77 : /* Stores multi-insert data related to a single relation in CopyFrom. */
78 : typedef struct CopyMultiInsertBuffer
79 : {
80 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
81 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
82 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
83 : * table; NULL if foreign table */
84 : int nused; /* number of 'slots' containing tuples */
85 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
86 : * stream */
87 : } CopyMultiInsertBuffer;
88 :
89 : /*
90 : * Stores one or many CopyMultiInsertBuffers and details about the size and
91 : * number of tuples which are stored in them. This allows multiple buffers to
92 : * exist at once when COPYing into a partitioned table.
93 : */
94 : typedef struct CopyMultiInsertInfo
95 : {
96 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
97 : int bufferedTuples; /* number of tuples buffered over all buffers */
98 : int bufferedBytes; /* number of bytes from all buffered tuples */
99 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
100 : EState *estate; /* Executor state used for COPY */
101 : CommandId mycid; /* Command Id used for COPY */
102 : int ti_options; /* table insert options */
103 : } CopyMultiInsertInfo;
104 :
105 :
106 : /* non-export function prototypes */
107 : static void ClosePipeFromProgram(CopyFromState cstate);
108 :
109 : /*
110 : * Built-in format-specific routines. One-row callbacks are defined in
111 : * copyfromparse.c.
112 : */
113 : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
114 : Oid *typioparam);
115 : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
116 : static void CopyFromTextLikeEnd(CopyFromState cstate);
117 : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
118 : FmgrInfo *finfo, Oid *typioparam);
119 : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
120 : static void CopyFromBinaryEnd(CopyFromState cstate);
121 :
122 :
123 : /*
124 : * COPY FROM routines for built-in formats.
125 : *
126 : * CSV and text formats share the same TextLike routines except for the
127 : * one-row callback.
128 : */
129 :
130 : /* text format */
131 : static const CopyFromRoutine CopyFromRoutineText = {
132 : .CopyFromInFunc = CopyFromTextLikeInFunc,
133 : .CopyFromStart = CopyFromTextLikeStart,
134 : .CopyFromOneRow = CopyFromTextOneRow,
135 : .CopyFromEnd = CopyFromTextLikeEnd,
136 : };
137 :
138 : /* CSV format */
139 : static const CopyFromRoutine CopyFromRoutineCSV = {
140 : .CopyFromInFunc = CopyFromTextLikeInFunc,
141 : .CopyFromStart = CopyFromTextLikeStart,
142 : .CopyFromOneRow = CopyFromCSVOneRow,
143 : .CopyFromEnd = CopyFromTextLikeEnd,
144 : };
145 :
146 : /* binary format */
147 : static const CopyFromRoutine CopyFromRoutineBinary = {
148 : .CopyFromInFunc = CopyFromBinaryInFunc,
149 : .CopyFromStart = CopyFromBinaryStart,
150 : .CopyFromOneRow = CopyFromBinaryOneRow,
151 : .CopyFromEnd = CopyFromBinaryEnd,
152 : };
153 :
154 : /* Return a COPY FROM routine for the given options */
155 : static const CopyFromRoutine *
156 1820 : CopyFromGetRoutine(const CopyFormatOptions *opts)
157 : {
158 1820 : if (opts->csv_mode)
159 258 : return &CopyFromRoutineCSV;
160 1562 : else if (opts->binary)
161 16 : return &CopyFromRoutineBinary;
162 :
163 : /* default is text */
164 1546 : return &CopyFromRoutineText;
165 : }
166 :
167 : /* Implementation of the start callback for text and CSV formats */
168 : static void
169 1780 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
170 : {
171 : AttrNumber attr_count;
172 :
173 : /*
174 : * If encoding conversion is needed, we need another buffer to hold the
175 : * converted input data. Otherwise, we can just point input_buf to the
176 : * same buffer as raw_buf.
177 : */
178 1780 : if (cstate->need_transcoding)
179 : {
180 24 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
181 24 : cstate->input_buf_index = cstate->input_buf_len = 0;
182 : }
183 : else
184 1756 : cstate->input_buf = cstate->raw_buf;
185 1780 : cstate->input_reached_eof = false;
186 :
187 1780 : initStringInfo(&cstate->line_buf);
188 :
189 : /*
190 : * Create workspace for CopyReadAttributes results; used by CSV and text
191 : * format.
192 : */
193 1780 : attr_count = list_length(cstate->attnumlist);
194 1780 : cstate->max_fields = attr_count;
195 1780 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
196 1780 : }
197 :
198 : /*
199 : * Implementation of the infunc callback for text and CSV formats. Assign
200 : * the input function data to the given *finfo.
201 : */
202 : static void
203 4992 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
204 : Oid *typioparam)
205 : {
206 : Oid func_oid;
207 :
208 4992 : getTypeInputInfo(atttypid, &func_oid, typioparam);
209 4992 : fmgr_info(func_oid, finfo);
210 4992 : }
211 :
212 : /* Implementation of the end callback for text and CSV formats */
213 : static void
214 1176 : CopyFromTextLikeEnd(CopyFromState cstate)
215 : {
216 : /* nothing to do */
217 1176 : }
218 :
219 : /* Implementation of the start callback for binary format */
220 : static void
221 14 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
222 : {
223 : /* Read and verify binary header */
224 14 : ReceiveCopyBinaryHeader(cstate);
225 14 : }
226 :
227 : /*
228 : * Implementation of the infunc callback for binary format. Assign
229 : * the binary input function to the given *finfo.
230 : */
231 : static void
232 62 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
233 : FmgrInfo *finfo, Oid *typioparam)
234 : {
235 : Oid func_oid;
236 :
237 62 : getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
238 60 : fmgr_info(func_oid, finfo);
239 60 : }
240 :
241 : /* Implementation of the end callback for binary format */
242 : static void
243 6 : CopyFromBinaryEnd(CopyFromState cstate)
244 : {
245 : /* nothing to do */
246 6 : }
247 :
248 : /*
249 : * error context callback for COPY FROM
250 : *
251 : * The argument for the error context must be CopyFromState.
252 : */
253 : void
254 314 : CopyFromErrorCallback(void *arg)
255 : {
256 314 : CopyFromState cstate = (CopyFromState) arg;
257 :
258 314 : if (cstate->relname_only)
259 : {
260 44 : errcontext("COPY %s",
261 : cstate->cur_relname);
262 44 : return;
263 : }
264 270 : if (cstate->opts.binary)
265 : {
266 : /* can't usefully display the data */
267 2 : if (cstate->cur_attname)
268 2 : errcontext("COPY %s, line %" PRIu64 ", column %s",
269 : cstate->cur_relname,
270 : cstate->cur_lineno,
271 : cstate->cur_attname);
272 : else
273 0 : errcontext("COPY %s, line %" PRIu64,
274 : cstate->cur_relname,
275 : cstate->cur_lineno);
276 : }
277 : else
278 : {
279 268 : if (cstate->cur_attname && cstate->cur_attval)
280 40 : {
281 : /* error is relevant to a particular column */
282 : char *attval;
283 :
284 40 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
285 40 : errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
286 : cstate->cur_relname,
287 : cstate->cur_lineno,
288 : cstate->cur_attname,
289 : attval);
290 40 : pfree(attval);
291 : }
292 228 : else if (cstate->cur_attname)
293 : {
294 : /* error is relevant to a particular column, value is NULL */
295 6 : errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
296 : cstate->cur_relname,
297 : cstate->cur_lineno,
298 : cstate->cur_attname);
299 : }
300 : else
301 : {
302 : /*
303 : * Error is relevant to a particular line.
304 : *
305 : * If line_buf still contains the correct line, print it.
306 : */
307 222 : if (cstate->line_buf_valid)
308 : {
309 : char *lineval;
310 :
311 184 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
312 184 : errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
313 : cstate->cur_relname,
314 : cstate->cur_lineno, lineval);
315 184 : pfree(lineval);
316 : }
317 : else
318 : {
319 38 : errcontext("COPY %s, line %" PRIu64,
320 : cstate->cur_relname,
321 : cstate->cur_lineno);
322 : }
323 : }
324 : }
325 : }
326 :
327 : /*
328 : * Make sure we don't print an unreasonable amount of COPY data in a message.
329 : *
330 : * Returns a pstrdup'd copy of the input.
331 : */
332 : char *
333 260 : CopyLimitPrintoutLength(const char *str)
334 : {
335 : #define MAX_COPY_DATA_DISPLAY 100
336 :
337 260 : int slen = strlen(str);
338 : int len;
339 : char *res;
340 :
341 : /* Fast path if definitely okay */
342 260 : if (slen <= MAX_COPY_DATA_DISPLAY)
343 260 : return pstrdup(str);
344 :
345 : /* Apply encoding-dependent truncation */
346 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
347 :
348 : /*
349 : * Truncate, and add "..." to show we truncated the input.
350 : */
351 0 : res = (char *) palloc(len + 4);
352 0 : memcpy(res, str, len);
353 0 : strcpy(res + len, "...");
354 :
355 0 : return res;
356 : }
357 :
358 : /*
359 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
360 : * ResultRelInfo.
361 : */
362 : static CopyMultiInsertBuffer *
363 1550 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
364 : {
365 : CopyMultiInsertBuffer *buffer;
366 :
367 1550 : buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
368 1550 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
369 1550 : buffer->resultRelInfo = rri;
370 1550 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
371 1550 : buffer->nused = 0;
372 :
373 1550 : return buffer;
374 : }
375 :
376 : /*
377 : * Make a new buffer for this ResultRelInfo.
378 : */
379 : static inline void
380 1550 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
381 : ResultRelInfo *rri)
382 : {
383 : CopyMultiInsertBuffer *buffer;
384 :
385 1550 : buffer = CopyMultiInsertBufferInit(rri);
386 :
387 : /* Setup back-link so we can easily find this buffer again */
388 1550 : rri->ri_CopyMultiInsertBuffer = buffer;
389 : /* Record that we're tracking this buffer */
390 1550 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
391 1550 : }
392 :
393 : /*
394 : * Initialize an already allocated CopyMultiInsertInfo.
395 : *
396 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
397 : * for that table.
398 : */
399 : static void
400 1540 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
401 : CopyFromState cstate, EState *estate, CommandId mycid,
402 : int ti_options)
403 : {
404 1540 : miinfo->multiInsertBuffers = NIL;
405 1540 : miinfo->bufferedTuples = 0;
406 1540 : miinfo->bufferedBytes = 0;
407 1540 : miinfo->cstate = cstate;
408 1540 : miinfo->estate = estate;
409 1540 : miinfo->mycid = mycid;
410 1540 : miinfo->ti_options = ti_options;
411 :
412 : /*
413 : * Only setup the buffer when not dealing with a partitioned table.
414 : * Buffers for partitioned tables will just be setup when we need to send
415 : * tuples their way for the first time.
416 : */
417 1540 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
418 1464 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
419 1540 : }
420 :
421 : /*
422 : * Returns true if the buffers are full
423 : */
424 : static inline bool
425 1195720 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
426 : {
427 1195720 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
428 1194726 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
429 1100 : return true;
430 1194620 : return false;
431 : }
432 :
433 : /*
434 : * Returns true if we have no buffered tuples
435 : */
436 : static inline bool
437 1402 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
438 : {
439 1402 : return miinfo->bufferedTuples == 0;
440 : }
441 :
442 : /*
443 : * Write the tuples stored in 'buffer' out to the table.
444 : */
445 : static inline void
446 2460 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
447 : CopyMultiInsertBuffer *buffer,
448 : int64 *processed)
449 : {
450 2460 : CopyFromState cstate = miinfo->cstate;
451 2460 : EState *estate = miinfo->estate;
452 2460 : int nused = buffer->nused;
453 2460 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
454 2460 : TupleTableSlot **slots = buffer->slots;
455 : int i;
456 :
457 2460 : if (resultRelInfo->ri_FdwRoutine)
458 : {
459 14 : int batch_size = resultRelInfo->ri_BatchSize;
460 14 : int sent = 0;
461 :
462 : Assert(buffer->bistate == NULL);
463 :
464 : /* Ensure that the FDW supports batching and it's enabled */
465 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
466 : Assert(batch_size > 1);
467 :
468 : /*
469 : * We suppress error context information other than the relation name,
470 : * if one of the operations below fails.
471 : */
472 : Assert(!cstate->relname_only);
473 14 : cstate->relname_only = true;
474 :
475 38 : while (sent < nused)
476 : {
477 26 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
478 26 : int inserted = size;
479 : TupleTableSlot **rslots;
480 :
481 : /* insert into foreign table: let the FDW do it */
482 : rslots =
483 26 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
484 : resultRelInfo,
485 26 : &slots[sent],
486 : NULL,
487 : &inserted);
488 :
489 24 : sent += size;
490 :
491 : /* No need to do anything if there are no inserted rows */
492 24 : if (inserted <= 0)
493 4 : continue;
494 :
495 : /* Triggers on foreign tables should not have transition tables */
496 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
497 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
498 :
499 : /* Run AFTER ROW INSERT triggers */
500 20 : if (resultRelInfo->ri_TrigDesc != NULL &&
501 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
502 : {
503 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
504 :
505 0 : for (i = 0; i < inserted; i++)
506 : {
507 0 : TupleTableSlot *slot = rslots[i];
508 :
509 : /*
510 : * AFTER ROW Triggers might reference the tableoid column,
511 : * so (re-)initialize tts_tableOid before evaluating them.
512 : */
513 0 : slot->tts_tableOid = relid;
514 :
515 0 : ExecARInsertTriggers(estate, resultRelInfo,
516 : slot, NIL,
517 : cstate->transition_capture);
518 : }
519 : }
520 :
521 : /* Update the row counter and progress of the COPY command */
522 20 : *processed += inserted;
523 20 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
524 : *processed);
525 : }
526 :
527 48 : for (i = 0; i < nused; i++)
528 36 : ExecClearTuple(slots[i]);
529 :
530 : /* reset relname_only */
531 12 : cstate->relname_only = false;
532 : }
533 : else
534 : {
535 2446 : CommandId mycid = miinfo->mycid;
536 2446 : int ti_options = miinfo->ti_options;
537 2446 : bool line_buf_valid = cstate->line_buf_valid;
538 2446 : uint64 save_cur_lineno = cstate->cur_lineno;
539 : MemoryContext oldcontext;
540 :
541 : Assert(buffer->bistate != NULL);
542 :
543 : /*
544 : * Print error context information correctly, if one of the operations
545 : * below fails.
546 : */
547 2446 : cstate->line_buf_valid = false;
548 :
549 : /*
550 : * table_multi_insert may leak memory, so switch to short-lived memory
551 : * context before calling it.
552 : */
553 2446 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
554 2446 : table_multi_insert(resultRelInfo->ri_RelationDesc,
555 : slots,
556 : nused,
557 : mycid,
558 : ti_options,
559 : buffer->bistate);
560 2446 : MemoryContextSwitchTo(oldcontext);
561 :
562 1197944 : for (i = 0; i < nused; i++)
563 : {
564 : /*
565 : * If there are any indexes, update them for all the inserted
566 : * tuples, and run AFTER ROW INSERT triggers.
567 : */
568 1195512 : if (resultRelInfo->ri_NumIndices > 0)
569 : {
570 : List *recheckIndexes;
571 :
572 202500 : cstate->cur_lineno = buffer->linenos[i];
573 : recheckIndexes =
574 202500 : ExecInsertIndexTuples(resultRelInfo,
575 : buffer->slots[i], estate, false,
576 : false, NULL, NIL, false);
577 202486 : ExecARInsertTriggers(estate, resultRelInfo,
578 202486 : slots[i], recheckIndexes,
579 : cstate->transition_capture);
580 202486 : list_free(recheckIndexes);
581 : }
582 :
583 : /*
584 : * There's no indexes, but see if we need to run AFTER ROW INSERT
585 : * triggers anyway.
586 : */
587 993012 : else if (resultRelInfo->ri_TrigDesc != NULL &&
588 84 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
589 66 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
590 : {
591 36 : cstate->cur_lineno = buffer->linenos[i];
592 36 : ExecARInsertTriggers(estate, resultRelInfo,
593 36 : slots[i], NIL,
594 : cstate->transition_capture);
595 : }
596 :
597 1195498 : ExecClearTuple(slots[i]);
598 : }
599 :
600 : /* Update the row counter and progress of the COPY command */
601 2432 : *processed += nused;
602 2432 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
603 : *processed);
604 :
605 : /* reset cur_lineno and line_buf_valid to what they were */
606 2432 : cstate->line_buf_valid = line_buf_valid;
607 2432 : cstate->cur_lineno = save_cur_lineno;
608 : }
609 :
610 : /* Mark that all slots are free */
611 2444 : buffer->nused = 0;
612 2444 : }
613 :
614 : /*
615 : * Drop used slots and free member for this buffer.
616 : *
617 : * The buffer must be flushed before cleanup.
618 : */
619 : static inline void
620 1368 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
621 : CopyMultiInsertBuffer *buffer)
622 : {
623 1368 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
624 : int i;
625 :
626 : /* Ensure buffer was flushed */
627 : Assert(buffer->nused == 0);
628 :
629 : /* Remove back-link to ourself */
630 1368 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
631 :
632 1368 : if (resultRelInfo->ri_FdwRoutine == NULL)
633 : {
634 : Assert(buffer->bistate != NULL);
635 1356 : FreeBulkInsertState(buffer->bistate);
636 : }
637 : else
638 : Assert(buffer->bistate == NULL);
639 :
640 : /* Since we only create slots on demand, just drop the non-null ones. */
641 235902 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
642 234534 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
643 :
644 1368 : if (resultRelInfo->ri_FdwRoutine == NULL)
645 1356 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
646 : miinfo->ti_options);
647 :
648 1368 : pfree(buffer);
649 1368 : }
650 :
651 : /*
652 : * Write out all stored tuples in all buffers out to the tables.
653 : *
654 : * Once flushed we also trim the tracked buffers list down to size by removing
655 : * the buffers created earliest first.
656 : *
657 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
658 : * used. When cleaning up old buffers we'll never remove the one for
659 : * 'curr_rri'.
660 : */
661 : static inline void
662 2234 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
663 : int64 *processed)
664 : {
665 : ListCell *lc;
666 :
667 4678 : foreach(lc, miinfo->multiInsertBuffers)
668 : {
669 2460 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
670 :
671 2460 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
672 : }
673 :
674 2218 : miinfo->bufferedTuples = 0;
675 2218 : miinfo->bufferedBytes = 0;
676 :
677 : /*
678 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
679 : * remove buffers starting with the ones we created first. It seems less
680 : * likely that these older ones will be needed than the ones that were
681 : * just created.
682 : */
683 2218 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
684 : {
685 : CopyMultiInsertBuffer *buffer;
686 :
687 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
688 :
689 : /*
690 : * We never want to remove the buffer that's currently being used, so
691 : * if we happen to find that then move it to the end of the list.
692 : */
693 0 : if (buffer->resultRelInfo == curr_rri)
694 : {
695 : /*
696 : * The code below would misbehave if we were trying to reduce the
697 : * list to less than two items.
698 : */
699 : StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
700 : "MAX_PARTITION_BUFFERS must be >= 2");
701 :
702 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
703 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
704 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
705 : }
706 :
707 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
708 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
709 : }
710 2218 : }
711 :
712 : /*
713 : * Cleanup allocated buffers and free memory
714 : */
715 : static inline void
716 1358 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
717 : {
718 : ListCell *lc;
719 :
720 2726 : foreach(lc, miinfo->multiInsertBuffers)
721 1368 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
722 :
723 1358 : list_free(miinfo->multiInsertBuffers);
724 1358 : }
725 :
726 : /*
727 : * Get the next TupleTableSlot that the next tuple should be stored in.
728 : *
729 : * Callers must ensure that the buffer is not full.
730 : *
731 : * Note: 'miinfo' is unused but has been included for consistency with the
732 : * other functions in this area.
733 : */
734 : static inline TupleTableSlot *
735 1197292 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
736 : ResultRelInfo *rri)
737 : {
738 1197292 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
739 : int nused;
740 :
741 : Assert(buffer != NULL);
742 : Assert(buffer->nused < MAX_BUFFERED_TUPLES);
743 :
744 1197292 : nused = buffer->nused;
745 :
746 1197292 : if (buffer->slots[nused] == NULL)
747 234900 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
748 1197292 : return buffer->slots[nused];
749 : }
750 :
751 : /*
752 : * Record the previously reserved TupleTableSlot that was reserved by
753 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
754 : */
755 : static inline void
756 1195720 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
757 : TupleTableSlot *slot, int tuplen, uint64 lineno)
758 : {
759 1195720 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
760 :
761 : Assert(buffer != NULL);
762 : Assert(slot == buffer->slots[buffer->nused]);
763 :
764 : /* Store the line number so we can properly report any errors later */
765 1195720 : buffer->linenos[buffer->nused] = lineno;
766 :
767 : /* Record this slot as being used */
768 1195720 : buffer->nused++;
769 :
770 : /* Update how many tuples are stored and their size */
771 1195720 : miinfo->bufferedTuples++;
772 1195720 : miinfo->bufferedBytes += tuplen;
773 1195720 : }
774 :
775 : /*
776 : * Copy FROM file to relation.
777 : */
778 : uint64
779 1724 : CopyFrom(CopyFromState cstate)
780 : {
781 : ResultRelInfo *resultRelInfo;
782 : ResultRelInfo *target_resultRelInfo;
783 1724 : ResultRelInfo *prevResultRelInfo = NULL;
784 1724 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
785 : ModifyTableState *mtstate;
786 : ExprContext *econtext;
787 1724 : TupleTableSlot *singleslot = NULL;
788 1724 : MemoryContext oldcontext = CurrentMemoryContext;
789 :
790 1724 : PartitionTupleRouting *proute = NULL;
791 : ErrorContextCallback errcallback;
792 1724 : CommandId mycid = GetCurrentCommandId(true);
793 1724 : int ti_options = 0; /* start with default options for insert */
794 1724 : BulkInsertState bistate = NULL;
795 : CopyInsertMethod insertMethod;
796 1724 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
797 1724 : int64 processed = 0;
798 1724 : int64 excluded = 0;
799 : bool has_before_insert_row_trig;
800 : bool has_instead_insert_row_trig;
801 1724 : bool leafpart_use_multi_insert = false;
802 :
803 : Assert(cstate->rel);
804 : Assert(list_length(cstate->range_table) == 1);
805 :
806 1724 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
807 : Assert(cstate->escontext);
808 :
809 : /*
810 : * The target must be a plain, foreign, or partitioned relation, or have
811 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
812 : * allowed on views, so we only hint about them in the view case.)
813 : */
814 1724 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
815 168 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
816 124 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
817 18 : !(cstate->rel->trigdesc &&
818 12 : cstate->rel->trigdesc->trig_insert_instead_row))
819 : {
820 6 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
821 6 : ereport(ERROR,
822 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
823 : errmsg("cannot copy to view \"%s\"",
824 : RelationGetRelationName(cstate->rel)),
825 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
826 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
827 0 : ereport(ERROR,
828 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
829 : errmsg("cannot copy to materialized view \"%s\"",
830 : RelationGetRelationName(cstate->rel))));
831 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
832 0 : ereport(ERROR,
833 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
834 : errmsg("cannot copy to sequence \"%s\"",
835 : RelationGetRelationName(cstate->rel))));
836 : else
837 0 : ereport(ERROR,
838 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
839 : errmsg("cannot copy to non-table relation \"%s\"",
840 : RelationGetRelationName(cstate->rel))));
841 : }
842 :
843 : /*
844 : * If the target file is new-in-transaction, we assume that checking FSM
845 : * for free space is a waste of time. This could possibly be wrong, but
846 : * it's unlikely.
847 : */
848 1718 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
849 1556 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
850 1544 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
851 88 : ti_options |= TABLE_INSERT_SKIP_FSM;
852 :
853 : /*
854 : * Optimize if new relation storage was created in this subxact or one of
855 : * its committed children and we won't see those rows later as part of an
856 : * earlier scan or command. The subxact test ensures that if this subxact
857 : * aborts then the frozen rows won't be visible after xact cleanup. Note
858 : * that the stronger test of exactly which subtransaction created it is
859 : * crucial for correctness of this optimization. The test for an earlier
860 : * scan or command tolerates false negatives. FREEZE causes other sessions
861 : * to see rows they would not see under MVCC, and a false negative merely
862 : * spreads that anomaly to the current session.
863 : */
864 1718 : if (cstate->opts.freeze)
865 : {
866 : /*
867 : * We currently disallow COPY FREEZE on partitioned tables. The
868 : * reason for this is that we've simply not yet opened the partitions
869 : * to determine if the optimization can be applied to them. We could
870 : * go and open them all here, but doing so may be quite a costly
871 : * overhead for small copies. In any case, we may just end up routing
872 : * tuples to a small number of partitions. It seems better just to
873 : * raise an ERROR for partitioned tables.
874 : */
875 72 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
876 : {
877 6 : ereport(ERROR,
878 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
880 : }
881 :
882 : /* There's currently no support for COPY FREEZE on foreign tables. */
883 66 : if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
884 6 : ereport(ERROR,
885 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
886 : errmsg("cannot perform COPY FREEZE on a foreign table")));
887 :
888 : /*
889 : * Tolerate one registration for the benefit of FirstXactSnapshot.
890 : * Scan-bearing queries generally create at least two registrations,
891 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
892 : * Clear CatalogSnapshot to avoid counting its registration. We'll
893 : * still detect ongoing catalog scans, each of which separately
894 : * registers the snapshot it uses.
895 : */
896 60 : InvalidateCatalogSnapshot();
897 60 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
898 0 : ereport(ERROR,
899 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
900 : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
901 :
902 120 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
903 60 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
904 18 : ereport(ERROR,
905 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
906 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
907 :
908 42 : ti_options |= TABLE_INSERT_FROZEN;
909 : }
910 :
911 : /*
912 : * We need a ResultRelInfo so we can use the regular executor's
913 : * index-entry-making machinery. (There used to be a huge amount of code
914 : * here that basically duplicated execUtils.c ...)
915 : */
916 1688 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
917 : bms_make_singleton(1));
918 1688 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
919 1688 : ExecInitResultRelation(estate, resultRelInfo, 1);
920 :
921 : /* Verify the named relation is a valid target for INSERT */
922 1688 : CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
923 :
924 1686 : ExecOpenIndices(resultRelInfo, false);
925 :
926 : /*
927 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
928 : * foreign-table result relation(s).
929 : */
930 1686 : mtstate = makeNode(ModifyTableState);
931 1686 : mtstate->ps.plan = NULL;
932 1686 : mtstate->ps.state = estate;
933 1686 : mtstate->operation = CMD_INSERT;
934 1686 : mtstate->mt_nrels = 1;
935 1686 : mtstate->resultRelInfo = resultRelInfo;
936 1686 : mtstate->rootResultRelInfo = resultRelInfo;
937 :
938 1686 : if (resultRelInfo->ri_FdwRoutine != NULL &&
939 36 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
940 36 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
941 : resultRelInfo);
942 :
943 : /*
944 : * Also, if the named relation is a foreign table, determine if the FDW
945 : * supports batch insert and determine the batch size (a FDW may support
946 : * batching, but it may be disabled for the server/table).
947 : *
948 : * If the FDW does not support batching, we set the batch size to 1.
949 : */
950 1686 : if (resultRelInfo->ri_FdwRoutine != NULL &&
951 36 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
952 36 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
953 36 : resultRelInfo->ri_BatchSize =
954 36 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
955 : else
956 1650 : resultRelInfo->ri_BatchSize = 1;
957 :
958 : Assert(resultRelInfo->ri_BatchSize >= 1);
959 :
960 : /* Prepare to catch AFTER triggers. */
961 1686 : AfterTriggerBeginQuery();
962 :
963 : /*
964 : * If there are any triggers with transition tables on the named relation,
965 : * we need to be prepared to capture transition tuples.
966 : *
967 : * Because partition tuple routing would like to know about whether
968 : * transition capture is active, we also set it in mtstate, which is
969 : * passed to ExecFindPartition() below.
970 : */
971 1686 : cstate->transition_capture = mtstate->mt_transition_capture =
972 1686 : MakeTransitionCaptureState(cstate->rel->trigdesc,
973 1686 : RelationGetRelid(cstate->rel),
974 : CMD_INSERT);
975 :
976 : /*
977 : * If the named relation is a partitioned table, initialize state for
978 : * CopyFrom tuple routing.
979 : */
980 1686 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
981 100 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
982 :
983 1686 : if (cstate->whereClause)
984 18 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
985 : &mtstate->ps);
986 :
987 : /*
988 : * It's generally more efficient to prepare a bunch of tuples for
989 : * insertion, and insert them in one
990 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
991 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
992 : * However, there are a number of reasons why we might not be able to do
993 : * this. These are explained below.
994 : */
995 1686 : if (resultRelInfo->ri_TrigDesc != NULL &&
996 182 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
997 90 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
998 : {
999 : /*
1000 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1001 : * triggers on the table. Such triggers might query the table we're
1002 : * inserting into and act differently if the tuples that have already
1003 : * been processed and prepared for insertion are not there.
1004 : */
1005 104 : insertMethod = CIM_SINGLE;
1006 : }
1007 1582 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
1008 28 : resultRelInfo->ri_BatchSize == 1)
1009 : {
1010 : /*
1011 : * Can't support multi-inserts to a foreign table if the FDW does not
1012 : * support batching, or it's disabled for the server or foreign table.
1013 : */
1014 18 : insertMethod = CIM_SINGLE;
1015 : }
1016 1564 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1017 26 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1018 : {
1019 : /*
1020 : * For partitioned tables we can't support multi-inserts when there
1021 : * are any statement level insert triggers. It might be possible to
1022 : * allow partitioned tables with such triggers in the future, but for
1023 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
1024 : * statement level insert triggers are on the same relation.
1025 : */
1026 18 : insertMethod = CIM_SINGLE;
1027 : }
1028 1546 : else if (cstate->volatile_defexprs)
1029 : {
1030 : /*
1031 : * Can't support multi-inserts if there are any volatile default
1032 : * expressions in the table. Similarly to the trigger case above,
1033 : * such expressions may query the table we're inserting into.
1034 : *
1035 : * Note: It does not matter if any partitions have any volatile
1036 : * default expressions as we use the defaults from the target of the
1037 : * COPY command.
1038 : */
1039 6 : insertMethod = CIM_SINGLE;
1040 : }
1041 1540 : else if (contain_volatile_functions(cstate->whereClause))
1042 : {
1043 : /*
1044 : * Can't support multi-inserts if there are any volatile function
1045 : * expressions in WHERE clause. Similarly to the trigger case above,
1046 : * such expressions may query the table we're inserting into.
1047 : *
1048 : * Note: the whereClause was already preprocessed in DoCopy(), so it's
1049 : * okay to use contain_volatile_functions() directly.
1050 : */
1051 0 : insertMethod = CIM_SINGLE;
1052 : }
1053 : else
1054 : {
1055 : /*
1056 : * For partitioned tables, we may still be able to perform bulk
1057 : * inserts. However, the possibility of this depends on which types
1058 : * of triggers exist on the partition. We must disable bulk inserts
1059 : * if the partition is a foreign table that can't use batching or it
1060 : * has any before row insert or insert instead triggers (same as we
1061 : * checked above for the parent table). Since the partition's
1062 : * resultRelInfos are initialized only when we actually need to insert
1063 : * the first tuple into them, we must have the intermediate insert
1064 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
1065 : * determine if we can use bulk-inserts for the partition being
1066 : * inserted into.
1067 : */
1068 1540 : if (proute)
1069 76 : insertMethod = CIM_MULTI_CONDITIONAL;
1070 : else
1071 1464 : insertMethod = CIM_MULTI;
1072 :
1073 1540 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1074 : estate, mycid, ti_options);
1075 : }
1076 :
1077 : /*
1078 : * If not using batch mode (which allocates slots as needed) set up a
1079 : * tuple slot too. When inserting into a partitioned table, we also need
1080 : * one, even if we might batch insert, to read the tuple in the root
1081 : * partition's form.
1082 : */
1083 1686 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1084 : {
1085 222 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1086 : &estate->es_tupleTable);
1087 222 : bistate = GetBulkInsertState();
1088 : }
1089 :
1090 1868 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1091 182 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1092 :
1093 1868 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1094 182 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1095 :
1096 : /*
1097 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1098 : * should do this for COPY, since it's not really an "INSERT" statement as
1099 : * such. However, executing these triggers maintains consistency with the
1100 : * EACH ROW triggers that we already fire on COPY.
1101 : */
1102 1686 : ExecBSInsertTriggers(estate, resultRelInfo);
1103 :
1104 1686 : econtext = GetPerTupleExprContext(estate);
1105 :
1106 : /* Set up callback to identify error line number */
1107 1686 : errcallback.callback = CopyFromErrorCallback;
1108 1686 : errcallback.arg = cstate;
1109 1686 : errcallback.previous = error_context_stack;
1110 1686 : error_context_stack = &errcallback;
1111 :
1112 : for (;;)
1113 1256228 : {
1114 : TupleTableSlot *myslot;
1115 : bool skip_tuple;
1116 :
1117 1257914 : CHECK_FOR_INTERRUPTS();
1118 :
1119 : /*
1120 : * Reset the per-tuple exprcontext. We do this after every tuple, to
1121 : * clean-up after expression evaluations etc.
1122 : */
1123 1257914 : ResetPerTupleExprContext(estate);
1124 :
1125 : /* select slot to (initially) load row into */
1126 1257914 : if (insertMethod == CIM_SINGLE || proute)
1127 : {
1128 274874 : myslot = singleslot;
1129 274874 : Assert(myslot != NULL);
1130 : }
1131 : else
1132 : {
1133 : Assert(resultRelInfo == target_resultRelInfo);
1134 : Assert(insertMethod == CIM_MULTI);
1135 :
1136 983040 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1137 : resultRelInfo);
1138 : }
1139 :
1140 : /*
1141 : * Switch to per-tuple context before calling NextCopyFrom, which does
1142 : * evaluate default expressions etc. and requires per-tuple context.
1143 : */
1144 1257914 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1145 :
1146 1257914 : ExecClearTuple(myslot);
1147 :
1148 : /* Directly store the values/nulls array in the slot */
1149 1257914 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1150 1494 : break;
1151 :
1152 1256268 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1153 162 : cstate->escontext->error_occurred)
1154 : {
1155 : /*
1156 : * Soft error occurred, skip this tuple and just make
1157 : * ErrorSaveContext ready for the next NextCopyFrom. Since we
1158 : * don't set details_wanted and error_data is not to be filled,
1159 : * just resetting error_occurred is enough.
1160 : */
1161 108 : cstate->escontext->error_occurred = false;
1162 :
1163 : /* Report that this tuple was skipped by the ON_ERROR clause */
1164 108 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1165 108 : cstate->num_errors);
1166 :
1167 108 : if (cstate->opts.reject_limit > 0 &&
1168 48 : cstate->num_errors > cstate->opts.reject_limit)
1169 6 : ereport(ERROR,
1170 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1171 : errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
1172 : cstate->opts.reject_limit)));
1173 :
1174 : /* Repeat NextCopyFrom() until no soft error occurs */
1175 102 : continue;
1176 : }
1177 :
1178 1256160 : ExecStoreVirtualTuple(myslot);
1179 :
1180 : /*
1181 : * Constraints and where clause might reference the tableoid column,
1182 : * so (re-)initialize tts_tableOid before evaluating them.
1183 : */
1184 1256160 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1185 :
1186 : /* Triggers and stuff need to be invoked in query context. */
1187 1256160 : MemoryContextSwitchTo(oldcontext);
1188 :
1189 1256160 : if (cstate->whereClause)
1190 : {
1191 66 : econtext->ecxt_scantuple = myslot;
1192 : /* Skip items that don't match COPY's WHERE clause */
1193 66 : if (!ExecQual(cstate->qualexpr, econtext))
1194 : {
1195 : /*
1196 : * Report that this tuple was filtered out by the WHERE
1197 : * clause.
1198 : */
1199 36 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1200 : ++excluded);
1201 36 : continue;
1202 : }
1203 : }
1204 :
1205 : /* Determine the partition to insert the tuple into */
1206 1256124 : if (proute)
1207 : {
1208 : TupleConversionMap *map;
1209 :
1210 : /*
1211 : * Attempt to find a partition suitable for this tuple.
1212 : * ExecFindPartition() will raise an error if none can be found or
1213 : * if the found partition is not suitable for INSERTs.
1214 : */
1215 274390 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1216 : proute, myslot, estate);
1217 :
1218 274388 : if (prevResultRelInfo != resultRelInfo)
1219 : {
1220 : /* Determine which triggers exist on this partition */
1221 161566 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1222 54 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1223 :
1224 161566 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1225 54 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1226 :
1227 : /*
1228 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1229 : * OF triggers, or if the partition is a foreign table that
1230 : * can't use batching.
1231 : */
1232 262970 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1233 101458 : !has_before_insert_row_trig &&
1234 364404 : !has_instead_insert_row_trig &&
1235 101434 : (resultRelInfo->ri_FdwRoutine == NULL ||
1236 12 : resultRelInfo->ri_BatchSize > 1);
1237 :
1238 : /* Set the multi-insert buffer to use for this partition. */
1239 161512 : if (leafpart_use_multi_insert)
1240 : {
1241 101430 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1242 86 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1243 : resultRelInfo);
1244 : }
1245 60082 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1246 28 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1247 : {
1248 : /*
1249 : * Flush pending inserts if this partition can't use
1250 : * batching, so rows are visible to triggers etc.
1251 : */
1252 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1253 : resultRelInfo,
1254 : &processed);
1255 : }
1256 :
1257 161512 : if (bistate != NULL)
1258 161512 : ReleaseBulkInsertStatePin(bistate);
1259 161512 : prevResultRelInfo = resultRelInfo;
1260 : }
1261 :
1262 : /*
1263 : * If we're capturing transition tuples, we might need to convert
1264 : * from the partition rowtype to root rowtype. But if there are no
1265 : * BEFORE triggers on the partition that could change the tuple,
1266 : * we can just remember the original unconverted tuple to avoid a
1267 : * needless round trip conversion.
1268 : */
1269 274388 : if (cstate->transition_capture != NULL)
1270 54 : cstate->transition_capture->tcs_original_insert_tuple =
1271 54 : !has_before_insert_row_trig ? myslot : NULL;
1272 :
1273 : /*
1274 : * We might need to convert from the root rowtype to the partition
1275 : * rowtype.
1276 : */
1277 274388 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1278 274388 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1279 : {
1280 : /* non batch insert */
1281 60136 : if (map != NULL)
1282 : {
1283 : TupleTableSlot *new_slot;
1284 :
1285 110 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1286 110 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1287 : }
1288 : }
1289 : else
1290 : {
1291 : /*
1292 : * Prepare to queue up tuple for later batch insert into
1293 : * current partition.
1294 : */
1295 : TupleTableSlot *batchslot;
1296 :
1297 : /* no other path available for partitioned table */
1298 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1299 :
1300 214252 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1301 : resultRelInfo);
1302 :
1303 214252 : if (map != NULL)
1304 12200 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1305 : batchslot);
1306 : else
1307 : {
1308 : /*
1309 : * This looks more expensive than it is (Believe me, I
1310 : * optimized it away. Twice.). The input is in virtual
1311 : * form, and we'll materialize the slot below - for most
1312 : * slot types the copy performs the work materialization
1313 : * would later require anyway.
1314 : */
1315 202052 : ExecCopySlot(batchslot, myslot);
1316 202052 : myslot = batchslot;
1317 : }
1318 : }
1319 :
1320 : /* ensure that triggers etc see the right relation */
1321 274388 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1322 : }
1323 :
1324 1256122 : skip_tuple = false;
1325 :
1326 : /* BEFORE ROW INSERT Triggers */
1327 1256122 : if (has_before_insert_row_trig)
1328 : {
1329 274 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1330 16 : skip_tuple = true; /* "do nothing" */
1331 : }
1332 :
1333 1256122 : if (!skip_tuple)
1334 : {
1335 : /*
1336 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1337 : * tuple. Otherwise, proceed with inserting the tuple into the
1338 : * table or foreign table.
1339 : */
1340 1256106 : if (has_instead_insert_row_trig)
1341 : {
1342 12 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1343 : }
1344 : else
1345 : {
1346 : /* Compute stored generated columns */
1347 1256094 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1348 461968 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1349 34 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1350 : CMD_INSERT);
1351 :
1352 : /*
1353 : * If the target is a plain table, check the constraints of
1354 : * the tuple.
1355 : */
1356 1256094 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1357 1256006 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1358 461932 : ExecConstraints(resultRelInfo, myslot, estate);
1359 :
1360 : /*
1361 : * Also check the tuple against the partition constraint, if
1362 : * there is one; except that if we got here via tuple-routing,
1363 : * we don't need to if there's no BR trigger defined on the
1364 : * partition.
1365 : */
1366 1256064 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1367 274376 : (proute == NULL || has_before_insert_row_trig))
1368 2308 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1369 :
1370 : /* Store the slot in the multi-insert buffer, when enabled. */
1371 1256064 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1372 : {
1373 : /*
1374 : * The slot previously might point into the per-tuple
1375 : * context. For batching it needs to be longer lived.
1376 : */
1377 1195720 : ExecMaterializeSlot(myslot);
1378 :
1379 : /* Add this tuple to the tuple buffer */
1380 1195720 : CopyMultiInsertInfoStore(&multiInsertInfo,
1381 : resultRelInfo, myslot,
1382 : cstate->line_buf.len,
1383 : cstate->cur_lineno);
1384 :
1385 : /*
1386 : * If enough inserts have queued up, then flush all
1387 : * buffers out to their tables.
1388 : */
1389 1195720 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1390 1100 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1391 : resultRelInfo,
1392 : &processed);
1393 :
1394 : /*
1395 : * We delay updating the row counter and progress of the
1396 : * COPY command until after writing the tuples stored in
1397 : * the buffer out to the table, as in single insert mode.
1398 : * See CopyMultiInsertBufferFlush().
1399 : */
1400 1195720 : continue; /* next tuple please */
1401 : }
1402 : else
1403 : {
1404 60344 : List *recheckIndexes = NIL;
1405 :
1406 : /* OK, store the tuple */
1407 60344 : if (resultRelInfo->ri_FdwRoutine != NULL)
1408 : {
1409 50 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1410 : resultRelInfo,
1411 : myslot,
1412 : NULL);
1413 :
1414 48 : if (myslot == NULL) /* "do nothing" */
1415 4 : continue; /* next tuple please */
1416 :
1417 : /*
1418 : * AFTER ROW Triggers might reference the tableoid
1419 : * column, so (re-)initialize tts_tableOid before
1420 : * evaluating them.
1421 : */
1422 44 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1423 : }
1424 : else
1425 : {
1426 : /* OK, store the tuple and create index entries for it */
1427 60294 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1428 : myslot, mycid, ti_options, bistate);
1429 :
1430 60294 : if (resultRelInfo->ri_NumIndices > 0)
1431 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1432 : myslot,
1433 : estate,
1434 : false,
1435 : false,
1436 : NULL,
1437 : NIL,
1438 : false);
1439 : }
1440 :
1441 : /* AFTER ROW INSERT Triggers */
1442 60338 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1443 : recheckIndexes, cstate->transition_capture);
1444 :
1445 60338 : list_free(recheckIndexes);
1446 : }
1447 : }
1448 :
1449 : /*
1450 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1451 : * or FDW; this is the same definition used by nodeModifyTable.c
1452 : * for counting tuples inserted by an INSERT command. Update
1453 : * progress of the COPY command as well.
1454 : */
1455 60350 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1456 : ++processed);
1457 : }
1458 : }
1459 :
1460 : /* Flush any remaining buffered tuples */
1461 1494 : if (insertMethod != CIM_SINGLE)
1462 : {
1463 1374 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1464 1134 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1465 : }
1466 :
1467 : /* Done, clean up */
1468 1478 : error_context_stack = errcallback.previous;
1469 :
1470 1478 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1471 30 : cstate->num_errors > 0 &&
1472 30 : cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1473 24 : ereport(NOTICE,
1474 : errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
1475 : "%" PRIu64 " rows were skipped due to data type incompatibility",
1476 : cstate->num_errors,
1477 : cstate->num_errors));
1478 :
1479 1478 : if (bistate != NULL)
1480 194 : FreeBulkInsertState(bistate);
1481 :
1482 1478 : MemoryContextSwitchTo(oldcontext);
1483 :
1484 : /* Execute AFTER STATEMENT insertion triggers */
1485 1478 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1486 :
1487 : /* Handle queued AFTER triggers */
1488 1478 : AfterTriggerEndQuery(estate);
1489 :
1490 1478 : ExecResetTupleTable(estate->es_tupleTable, false);
1491 :
1492 : /* Allow the FDW to shut down */
1493 1478 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1494 32 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1495 32 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1496 : target_resultRelInfo);
1497 :
1498 : /* Tear down the multi-insert buffer data */
1499 1478 : if (insertMethod != CIM_SINGLE)
1500 1358 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1501 :
1502 : /* Close all the partitioned tables, leaf partitions, and their indices */
1503 1478 : if (proute)
1504 98 : ExecCleanupTupleRouting(mtstate, proute);
1505 :
1506 : /* Close the result relations, including any trigger target relations */
1507 1478 : ExecCloseResultRelations(estate);
1508 1478 : ExecCloseRangeTableRelations(estate);
1509 :
1510 1478 : FreeExecutorState(estate);
1511 :
1512 1478 : return processed;
1513 : }
1514 :
1515 : /*
1516 : * Setup to read tuples from a file for COPY FROM.
1517 : *
1518 : * 'rel': Used as a template for the tuples
1519 : * 'whereClause': WHERE clause from the COPY FROM command
1520 : * 'filename': Name of server-local file to read, NULL for STDIN
1521 : * 'is_program': true if 'filename' is program to execute
1522 : * 'data_source_cb': callback that provides the input data
1523 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1524 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1525 : *
1526 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1527 : */
1528 : CopyFromState
1529 2050 : BeginCopyFrom(ParseState *pstate,
1530 : Relation rel,
1531 : Node *whereClause,
1532 : const char *filename,
1533 : bool is_program,
1534 : copy_data_source_cb data_source_cb,
1535 : List *attnamelist,
1536 : List *options)
1537 : {
1538 : CopyFromState cstate;
1539 2050 : bool pipe = (filename == NULL);
1540 : TupleDesc tupDesc;
1541 : AttrNumber num_phys_attrs,
1542 : num_defaults;
1543 : FmgrInfo *in_functions;
1544 : Oid *typioparams;
1545 : int *defmap;
1546 : ExprState **defexprs;
1547 : MemoryContext oldcontext;
1548 : bool volatile_defexprs;
1549 2050 : const int progress_cols[] = {
1550 : PROGRESS_COPY_COMMAND,
1551 : PROGRESS_COPY_TYPE,
1552 : PROGRESS_COPY_BYTES_TOTAL
1553 : };
1554 2050 : int64 progress_vals[] = {
1555 : PROGRESS_COPY_COMMAND_FROM,
1556 : 0,
1557 : 0
1558 : };
1559 :
1560 : /* Allocate workspace and zero all fields */
1561 2050 : cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1562 :
1563 : /*
1564 : * We allocate everything used by a cstate in a new memory context. This
1565 : * avoids memory leaks during repeated use of COPY in a query.
1566 : */
1567 2050 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1568 : "COPY",
1569 : ALLOCSET_DEFAULT_SIZES);
1570 :
1571 2050 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1572 :
1573 : /* Extract options from the statement node tree */
1574 2050 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1575 :
1576 : /* Set the format routine */
1577 1820 : cstate->routine = CopyFromGetRoutine(&cstate->opts);
1578 :
1579 : /* Process the target relation */
1580 1820 : cstate->rel = rel;
1581 :
1582 1820 : tupDesc = RelationGetDescr(cstate->rel);
1583 :
1584 : /* process common options or initialization */
1585 :
1586 : /* Generate or convert list of attributes to process */
1587 1820 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1588 :
1589 1820 : num_phys_attrs = tupDesc->natts;
1590 :
1591 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1592 1820 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1593 1820 : if (cstate->opts.force_notnull_all)
1594 12 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1595 1808 : else if (cstate->opts.force_notnull)
1596 : {
1597 : List *attnums;
1598 : ListCell *cur;
1599 :
1600 26 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1601 :
1602 52 : foreach(cur, attnums)
1603 : {
1604 32 : int attnum = lfirst_int(cur);
1605 32 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1606 :
1607 32 : if (!list_member_int(cstate->attnumlist, attnum))
1608 6 : ereport(ERROR,
1609 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1610 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1611 : errmsg("%s column \"%s\" not referenced by COPY",
1612 : "FORCE_NOT_NULL", NameStr(attr->attname))));
1613 26 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1614 : }
1615 : }
1616 :
1617 : /* Set up soft error handler for ON_ERROR */
1618 1814 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1619 : {
1620 64 : cstate->escontext = makeNode(ErrorSaveContext);
1621 64 : cstate->escontext->type = T_ErrorSaveContext;
1622 64 : cstate->escontext->error_occurred = false;
1623 :
1624 : /*
1625 : * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1626 : * options later
1627 : */
1628 64 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1629 64 : cstate->escontext->details_wanted = false;
1630 : }
1631 : else
1632 1750 : cstate->escontext = NULL;
1633 :
1634 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1635 1814 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1636 1814 : if (cstate->opts.force_null_all)
1637 12 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1638 1802 : else if (cstate->opts.force_null)
1639 : {
1640 : List *attnums;
1641 : ListCell *cur;
1642 :
1643 26 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1644 :
1645 52 : foreach(cur, attnums)
1646 : {
1647 32 : int attnum = lfirst_int(cur);
1648 32 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1649 :
1650 32 : if (!list_member_int(cstate->attnumlist, attnum))
1651 6 : ereport(ERROR,
1652 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1653 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1654 : errmsg("%s column \"%s\" not referenced by COPY",
1655 : "FORCE_NULL", NameStr(attr->attname))));
1656 26 : cstate->opts.force_null_flags[attnum - 1] = true;
1657 : }
1658 : }
1659 :
1660 : /* Convert convert_selectively name list to per-column flags */
1661 1808 : if (cstate->opts.convert_selectively)
1662 : {
1663 : List *attnums;
1664 : ListCell *cur;
1665 :
1666 4 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1667 :
1668 4 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1669 :
1670 8 : foreach(cur, attnums)
1671 : {
1672 4 : int attnum = lfirst_int(cur);
1673 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1674 :
1675 4 : if (!list_member_int(cstate->attnumlist, attnum))
1676 0 : ereport(ERROR,
1677 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1678 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1679 : NameStr(attr->attname))));
1680 4 : cstate->convert_select_flags[attnum - 1] = true;
1681 : }
1682 : }
1683 :
1684 : /* Use client encoding when ENCODING option is not specified. */
1685 1808 : if (cstate->opts.file_encoding < 0)
1686 1790 : cstate->file_encoding = pg_get_client_encoding();
1687 : else
1688 18 : cstate->file_encoding = cstate->opts.file_encoding;
1689 :
1690 : /*
1691 : * Look up encoding conversion function.
1692 : */
1693 1808 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1694 54 : cstate->file_encoding == PG_SQL_ASCII ||
1695 24 : GetDatabaseEncoding() == PG_SQL_ASCII)
1696 : {
1697 1784 : cstate->need_transcoding = false;
1698 : }
1699 : else
1700 : {
1701 24 : cstate->need_transcoding = true;
1702 24 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1703 : GetDatabaseEncoding());
1704 24 : if (!OidIsValid(cstate->conversion_proc))
1705 0 : ereport(ERROR,
1706 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1707 : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1708 : pg_encoding_to_char(cstate->file_encoding),
1709 : pg_encoding_to_char(GetDatabaseEncoding()))));
1710 : }
1711 :
1712 1808 : cstate->copy_src = COPY_FILE; /* default */
1713 :
1714 1808 : cstate->whereClause = whereClause;
1715 :
1716 : /* Initialize state variables */
1717 1808 : cstate->eol_type = EOL_UNKNOWN;
1718 1808 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1719 1808 : cstate->cur_lineno = 0;
1720 1808 : cstate->cur_attname = NULL;
1721 1808 : cstate->cur_attval = NULL;
1722 1808 : cstate->relname_only = false;
1723 :
1724 : /*
1725 : * Allocate buffers for the input pipeline.
1726 : *
1727 : * attribute_buf and raw_buf are used in both text and binary modes, but
1728 : * input_buf and line_buf only in text mode.
1729 : */
1730 1808 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1731 1808 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1732 1808 : cstate->raw_reached_eof = false;
1733 :
1734 1808 : initStringInfo(&cstate->attribute_buf);
1735 :
1736 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1737 1808 : if (pstate)
1738 : {
1739 1738 : cstate->range_table = pstate->p_rtable;
1740 1738 : cstate->rteperminfos = pstate->p_rteperminfos;
1741 : }
1742 :
1743 1808 : num_defaults = 0;
1744 1808 : volatile_defexprs = false;
1745 :
1746 : /*
1747 : * Pick up the required catalog information for each attribute in the
1748 : * relation, including the input function, the element type (to pass to
1749 : * the input function), and info about defaults and constraints. (Which
1750 : * input function we use depends on text/binary format choice.)
1751 : */
1752 1808 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1753 1808 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1754 1808 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1755 1808 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1756 :
1757 6972 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1758 : {
1759 5178 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1760 :
1761 : /* We don't need info for dropped attributes */
1762 5178 : if (att->attisdropped)
1763 124 : continue;
1764 :
1765 : /* Fetch the input function and typioparam info */
1766 5054 : cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1767 5054 : &in_functions[attnum - 1],
1768 5054 : &typioparams[attnum - 1]);
1769 :
1770 : /* Get default info if available */
1771 5052 : defexprs[attnum - 1] = NULL;
1772 :
1773 : /*
1774 : * We only need the default values for columns that do not appear in
1775 : * the column list, unless the DEFAULT option was given. We never need
1776 : * default values for generated columns.
1777 : */
1778 5052 : if ((cstate->opts.default_print != NULL ||
1779 4926 : !list_member_int(cstate->attnumlist, attnum)) &&
1780 670 : !att->attgenerated)
1781 : {
1782 634 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1783 : attnum);
1784 :
1785 634 : if (defexpr != NULL)
1786 : {
1787 : /* Run the expression through planner */
1788 264 : defexpr = expression_planner(defexpr);
1789 :
1790 : /* Initialize executable expression in copycontext */
1791 252 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1792 :
1793 : /* if NOT copied from input */
1794 : /* use default value if one exists */
1795 252 : if (!list_member_int(cstate->attnumlist, attnum))
1796 : {
1797 172 : defmap[num_defaults] = attnum - 1;
1798 172 : num_defaults++;
1799 : }
1800 :
1801 : /*
1802 : * If a default expression looks at the table being loaded,
1803 : * then it could give the wrong answer when using
1804 : * multi-insert. Since database access can be dynamic this is
1805 : * hard to test for exactly, so we use the much wider test of
1806 : * whether the default expression is volatile. We allow for
1807 : * the special case of when the default expression is the
1808 : * nextval() of a sequence which in this specific case is
1809 : * known to be safe for use with the multi-insert
1810 : * optimization. Hence we use this special case function
1811 : * checker rather than the standard check for
1812 : * contain_volatile_functions(). Note also that we already
1813 : * ran the expression through expression_planner().
1814 : */
1815 252 : if (!volatile_defexprs)
1816 252 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1817 : }
1818 : }
1819 : }
1820 :
1821 1794 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1822 :
1823 : /* initialize progress */
1824 1794 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1825 1794 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1826 1794 : cstate->bytes_processed = 0;
1827 :
1828 : /* We keep those variables in cstate. */
1829 1794 : cstate->in_functions = in_functions;
1830 1794 : cstate->typioparams = typioparams;
1831 1794 : cstate->defmap = defmap;
1832 1794 : cstate->defexprs = defexprs;
1833 1794 : cstate->volatile_defexprs = volatile_defexprs;
1834 1794 : cstate->num_defaults = num_defaults;
1835 1794 : cstate->is_program = is_program;
1836 :
1837 1794 : if (data_source_cb)
1838 : {
1839 376 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1840 376 : cstate->copy_src = COPY_CALLBACK;
1841 376 : cstate->data_source_cb = data_source_cb;
1842 : }
1843 1418 : else if (pipe)
1844 : {
1845 980 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1846 : Assert(!is_program); /* the grammar does not allow this */
1847 980 : if (whereToSendOutput == DestRemote)
1848 980 : ReceiveCopyBegin(cstate);
1849 : else
1850 0 : cstate->copy_file = stdin;
1851 : }
1852 : else
1853 : {
1854 438 : cstate->filename = pstrdup(filename);
1855 :
1856 438 : if (cstate->is_program)
1857 : {
1858 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1859 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1860 0 : if (cstate->copy_file == NULL)
1861 0 : ereport(ERROR,
1862 : (errcode_for_file_access(),
1863 : errmsg("could not execute command \"%s\": %m",
1864 : cstate->filename)));
1865 : }
1866 : else
1867 : {
1868 : struct stat st;
1869 :
1870 438 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1871 438 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1872 438 : if (cstate->copy_file == NULL)
1873 : {
1874 : /* copy errno because ereport subfunctions might change it */
1875 0 : int save_errno = errno;
1876 :
1877 0 : ereport(ERROR,
1878 : (errcode_for_file_access(),
1879 : errmsg("could not open file \"%s\" for reading: %m",
1880 : cstate->filename),
1881 : (save_errno == ENOENT || save_errno == EACCES) ?
1882 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1883 : "You may want a client-side facility such as psql's \\copy.") : 0));
1884 : }
1885 :
1886 438 : if (fstat(fileno(cstate->copy_file), &st))
1887 0 : ereport(ERROR,
1888 : (errcode_for_file_access(),
1889 : errmsg("could not stat file \"%s\": %m",
1890 : cstate->filename)));
1891 :
1892 438 : if (S_ISDIR(st.st_mode))
1893 0 : ereport(ERROR,
1894 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1895 : errmsg("\"%s\" is a directory", cstate->filename)));
1896 :
1897 438 : progress_vals[2] = st.st_size;
1898 : }
1899 : }
1900 :
1901 1794 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1902 :
1903 1794 : cstate->routine->CopyFromStart(cstate, tupDesc);
1904 :
1905 1794 : MemoryContextSwitchTo(oldcontext);
1906 :
1907 1794 : return cstate;
1908 : }
1909 :
1910 : /*
1911 : * Clean up storage and release resources for COPY FROM.
1912 : */
1913 : void
1914 1182 : EndCopyFrom(CopyFromState cstate)
1915 : {
1916 : /* Invoke the end callback */
1917 1182 : cstate->routine->CopyFromEnd(cstate);
1918 :
1919 : /* No COPY FROM related resources except memory. */
1920 1182 : if (cstate->is_program)
1921 : {
1922 0 : ClosePipeFromProgram(cstate);
1923 : }
1924 : else
1925 : {
1926 1182 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1927 0 : ereport(ERROR,
1928 : (errcode_for_file_access(),
1929 : errmsg("could not close file \"%s\": %m",
1930 : cstate->filename)));
1931 : }
1932 :
1933 1182 : pgstat_progress_end_command();
1934 :
1935 1182 : MemoryContextDelete(cstate->copycontext);
1936 1182 : pfree(cstate);
1937 1182 : }
1938 :
1939 : /*
1940 : * Closes the pipe from an external program, checking the pclose() return code.
1941 : */
1942 : static void
1943 0 : ClosePipeFromProgram(CopyFromState cstate)
1944 : {
1945 : int pclose_rc;
1946 :
1947 : Assert(cstate->is_program);
1948 :
1949 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1950 0 : if (pclose_rc == -1)
1951 0 : ereport(ERROR,
1952 : (errcode_for_file_access(),
1953 : errmsg("could not close pipe to external command: %m")));
1954 0 : else if (pclose_rc != 0)
1955 : {
1956 : /*
1957 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1958 : * expectable for the called program to fail with SIGPIPE, and we
1959 : * should not report that as an error. Otherwise, SIGPIPE indicates a
1960 : * problem.
1961 : */
1962 0 : if (!cstate->raw_reached_eof &&
1963 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1964 0 : return;
1965 :
1966 0 : ereport(ERROR,
1967 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1968 : errmsg("program \"%s\" failed",
1969 : cstate->filename),
1970 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1971 : }
1972 : }
|