Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/tableam.h"
29 : #include "access/tupconvert.h"
30 : #include "access/xact.h"
31 : #include "catalog/namespace.h"
32 : #include "commands/copyapi.h"
33 : #include "commands/copyfrom_internal.h"
34 : #include "commands/progress.h"
35 : #include "commands/trigger.h"
36 : #include "executor/execPartition.h"
37 : #include "executor/executor.h"
38 : #include "executor/nodeModifyTable.h"
39 : #include "executor/tuptable.h"
40 : #include "foreign/fdwapi.h"
41 : #include "mb/pg_wchar.h"
42 : #include "miscadmin.h"
43 : #include "nodes/miscnodes.h"
44 : #include "optimizer/optimizer.h"
45 : #include "pgstat.h"
46 : #include "rewrite/rewriteHandler.h"
47 : #include "storage/fd.h"
48 : #include "tcop/tcopprot.h"
49 : #include "utils/lsyscache.h"
50 : #include "utils/memutils.h"
51 : #include "utils/portal.h"
52 : #include "utils/rel.h"
53 : #include "utils/snapmgr.h"
54 : #include "utils/typcache.h"
55 :
56 : /*
57 : * No more than this many tuples per CopyMultiInsertBuffer
58 : *
59 : * Caution: Don't make this too big, as we could end up with this many
60 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
61 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
62 : * memory requirements during copies into partitioned tables with a large
63 : * number of partitions.
64 : */
65 : #define MAX_BUFFERED_TUPLES 1000
66 :
67 : /*
68 : * Flush buffers if there are >= this many bytes, as counted by the input
69 : * size, of tuples stored.
70 : */
71 : #define MAX_BUFFERED_BYTES 65535
72 :
73 : /*
74 : * Trim the list of buffers back down to this number after flushing. This
75 : * must be >= 2.
76 : */
77 : #define MAX_PARTITION_BUFFERS 32
78 :
79 : /* Stores multi-insert data related to a single relation in CopyFrom. */
80 : typedef struct CopyMultiInsertBuffer
81 : {
82 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
83 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
84 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
85 : * table; NULL if foreign table */
86 : int nused; /* number of 'slots' containing tuples */
87 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
88 : * stream */
89 : } CopyMultiInsertBuffer;
90 :
91 : /*
92 : * Stores one or many CopyMultiInsertBuffers and details about the size and
93 : * number of tuples which are stored in them. This allows multiple buffers to
94 : * exist at once when COPYing into a partitioned table.
95 : */
96 : typedef struct CopyMultiInsertInfo
97 : {
98 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
99 : int bufferedTuples; /* number of tuples buffered over all buffers */
100 : int bufferedBytes; /* number of bytes from all buffered tuples */
101 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
102 : EState *estate; /* Executor state used for COPY */
103 : CommandId mycid; /* Command Id used for COPY */
104 : uint32 ti_options; /* table insert options */
105 : } CopyMultiInsertInfo;
106 :
107 :
108 : /* non-export function prototypes */
109 : static void ClosePipeFromProgram(CopyFromState cstate);
110 :
111 : /*
112 : * Built-in format-specific routines. One-row callbacks are defined in
113 : * copyfromparse.c.
114 : */
115 : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
116 : Oid *typioparam);
117 : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
118 : static void CopyFromTextLikeEnd(CopyFromState cstate);
119 : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
120 : FmgrInfo *finfo, Oid *typioparam);
121 : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
122 : static void CopyFromBinaryEnd(CopyFromState cstate);
123 :
124 :
125 : /*
126 : * COPY FROM routines for built-in formats.
127 : *
128 : * CSV and text formats share the same TextLike routines except for the
129 : * one-row callback.
130 : */
131 :
132 : /* text format */
133 : static const CopyFromRoutine CopyFromRoutineText = {
134 : .CopyFromInFunc = CopyFromTextLikeInFunc,
135 : .CopyFromStart = CopyFromTextLikeStart,
136 : .CopyFromOneRow = CopyFromTextOneRow,
137 : .CopyFromEnd = CopyFromTextLikeEnd,
138 : };
139 :
140 : /* CSV format */
141 : static const CopyFromRoutine CopyFromRoutineCSV = {
142 : .CopyFromInFunc = CopyFromTextLikeInFunc,
143 : .CopyFromStart = CopyFromTextLikeStart,
144 : .CopyFromOneRow = CopyFromCSVOneRow,
145 : .CopyFromEnd = CopyFromTextLikeEnd,
146 : };
147 :
148 : /* binary format */
149 : static const CopyFromRoutine CopyFromRoutineBinary = {
150 : .CopyFromInFunc = CopyFromBinaryInFunc,
151 : .CopyFromStart = CopyFromBinaryStart,
152 : .CopyFromOneRow = CopyFromBinaryOneRow,
153 : .CopyFromEnd = CopyFromBinaryEnd,
154 : };
155 :
156 : /* Return a COPY FROM routine for the given options */
157 : static const CopyFromRoutine *
158 1195 : CopyFromGetRoutine(const CopyFormatOptions *opts)
159 : {
160 1195 : if (opts->format == COPY_FORMAT_CSV)
161 189 : return &CopyFromRoutineCSV;
162 1006 : else if (opts->format == COPY_FORMAT_BINARY)
163 9 : return &CopyFromRoutineBinary;
164 :
165 : /* default is text */
166 997 : return &CopyFromRoutineText;
167 : }
168 :
169 : /* Implementation of the start callback for text and CSV formats */
170 : static void
171 1170 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
172 : {
173 : AttrNumber attr_count;
174 :
175 : /*
176 : * If encoding conversion is needed, we need another buffer to hold the
177 : * converted input data. Otherwise, we can just point input_buf to the
178 : * same buffer as raw_buf.
179 : */
180 1170 : if (cstate->need_transcoding)
181 : {
182 24 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
183 24 : cstate->input_buf_index = cstate->input_buf_len = 0;
184 : }
185 : else
186 1146 : cstate->input_buf = cstate->raw_buf;
187 1170 : cstate->input_reached_eof = false;
188 :
189 1170 : initStringInfo(&cstate->line_buf);
190 :
191 : /*
192 : * Create workspace for CopyReadAttributes results; used by CSV and text
193 : * format.
194 : */
195 1170 : attr_count = list_length(cstate->attnumlist);
196 1170 : cstate->max_fields = attr_count;
197 1170 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
198 1170 : }
199 :
200 : /*
201 : * Implementation of the infunc callback for text and CSV formats. Assign
202 : * the input function data to the given *finfo.
203 : */
204 : static void
205 3433 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
206 : Oid *typioparam)
207 : {
208 : Oid func_oid;
209 :
210 3433 : getTypeInputInfo(atttypid, &func_oid, typioparam);
211 3433 : fmgr_info(func_oid, finfo);
212 3433 : }
213 :
214 : /* Implementation of the end callback for text and CSV formats */
215 : static void
216 786 : CopyFromTextLikeEnd(CopyFromState cstate)
217 : {
218 : /* nothing to do */
219 786 : }
220 :
221 : /* Implementation of the start callback for binary format */
222 : static void
223 8 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
224 : {
225 : /* Read and verify binary header */
226 8 : ReceiveCopyBinaryHeader(cstate);
227 8 : }
228 :
229 : /*
230 : * Implementation of the infunc callback for binary format. Assign
231 : * the binary input function to the given *finfo.
232 : */
233 : static void
234 38 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
235 : FmgrInfo *finfo, Oid *typioparam)
236 : {
237 : Oid func_oid;
238 :
239 38 : getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
240 37 : fmgr_info(func_oid, finfo);
241 37 : }
242 :
243 : /* Implementation of the end callback for binary format */
244 : static void
245 4 : CopyFromBinaryEnd(CopyFromState cstate)
246 : {
247 : /* nothing to do */
248 4 : }
249 :
250 : /*
251 : * error context callback for COPY FROM
252 : *
253 : * The argument for the error context must be CopyFromState.
254 : */
255 : void
256 244 : CopyFromErrorCallback(void *arg)
257 : {
258 244 : CopyFromState cstate = (CopyFromState) arg;
259 :
260 244 : if (cstate->relname_only)
261 : {
262 45 : errcontext("COPY %s",
263 : cstate->cur_relname);
264 45 : return;
265 : }
266 199 : if (cstate->opts.format == COPY_FORMAT_BINARY)
267 : {
268 : /* can't usefully display the data */
269 1 : if (cstate->cur_attname)
270 1 : errcontext("COPY %s, line %" PRIu64 ", column %s",
271 : cstate->cur_relname,
272 : cstate->cur_lineno,
273 : cstate->cur_attname);
274 : else
275 0 : errcontext("COPY %s, line %" PRIu64,
276 : cstate->cur_relname,
277 : cstate->cur_lineno);
278 : }
279 : else
280 : {
281 198 : if (cstate->cur_attname && cstate->cur_attval)
282 34 : {
283 : /* error is relevant to a particular column */
284 : char *attval;
285 :
286 34 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
287 34 : errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
288 : cstate->cur_relname,
289 : cstate->cur_lineno,
290 : cstate->cur_attname,
291 : attval);
292 34 : pfree(attval);
293 : }
294 164 : else if (cstate->cur_attname)
295 : {
296 : /* error is relevant to a particular column, value is NULL */
297 8 : errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
298 : cstate->cur_relname,
299 : cstate->cur_lineno,
300 : cstate->cur_attname);
301 : }
302 : else
303 : {
304 : /*
305 : * Error is relevant to a particular line.
306 : *
307 : * If line_buf still contains the correct line, print it.
308 : */
309 156 : if (cstate->line_buf_valid)
310 : {
311 : char *lineval;
312 :
313 129 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
314 129 : errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
315 : cstate->cur_relname,
316 : cstate->cur_lineno, lineval);
317 129 : pfree(lineval);
318 : }
319 : else
320 : {
321 27 : errcontext("COPY %s, line %" PRIu64,
322 : cstate->cur_relname,
323 : cstate->cur_lineno);
324 : }
325 : }
326 : }
327 : }
328 :
329 : /*
330 : * Make sure we don't print an unreasonable amount of COPY data in a message.
331 : *
332 : * Returns a pstrdup'd copy of the input.
333 : */
334 : char *
335 203 : CopyLimitPrintoutLength(const char *str)
336 : {
337 : #define MAX_COPY_DATA_DISPLAY 100
338 :
339 203 : int slen = strlen(str);
340 : int len;
341 : char *res;
342 :
343 : /* Fast path if definitely okay */
344 203 : if (slen <= MAX_COPY_DATA_DISPLAY)
345 203 : return pstrdup(str);
346 :
347 : /* Apply encoding-dependent truncation */
348 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
349 :
350 : /*
351 : * Truncate, and add "..." to show we truncated the input.
352 : */
353 0 : res = (char *) palloc(len + 4);
354 0 : memcpy(res, str, len);
355 0 : strcpy(res + len, "...");
356 :
357 0 : return res;
358 : }
359 :
360 : /*
361 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
362 : * ResultRelInfo.
363 : */
364 : static CopyMultiInsertBuffer *
365 1024 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
366 : {
367 : CopyMultiInsertBuffer *buffer;
368 :
369 1024 : buffer = palloc_object(CopyMultiInsertBuffer);
370 1024 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
371 1024 : buffer->resultRelInfo = rri;
372 1024 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
373 1024 : buffer->nused = 0;
374 :
375 1024 : return buffer;
376 : }
377 :
378 : /*
379 : * Make a new buffer for this ResultRelInfo.
380 : */
381 : static inline void
382 1024 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
383 : ResultRelInfo *rri)
384 : {
385 : CopyMultiInsertBuffer *buffer;
386 :
387 1024 : buffer = CopyMultiInsertBufferInit(rri);
388 :
389 : /* Setup back-link so we can easily find this buffer again */
390 1024 : rri->ri_CopyMultiInsertBuffer = buffer;
391 : /* Record that we're tracking this buffer */
392 1024 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
393 1024 : }
394 :
395 : /*
396 : * Initialize an already allocated CopyMultiInsertInfo.
397 : *
398 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
399 : * for that table.
400 : */
401 : static void
402 1020 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
403 : CopyFromState cstate, EState *estate, CommandId mycid,
404 : uint32 ti_options)
405 : {
406 1020 : miinfo->multiInsertBuffers = NIL;
407 1020 : miinfo->bufferedTuples = 0;
408 1020 : miinfo->bufferedBytes = 0;
409 1020 : miinfo->cstate = cstate;
410 1020 : miinfo->estate = estate;
411 1020 : miinfo->mycid = mycid;
412 1020 : miinfo->ti_options = ti_options;
413 :
414 : /*
415 : * Only setup the buffer when not dealing with a partitioned table.
416 : * Buffers for partitioned tables will just be setup when we need to send
417 : * tuples their way for the first time.
418 : */
419 1020 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
420 975 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
421 1020 : }
422 :
423 : /*
424 : * Returns true if the buffers are full
425 : */
426 : static inline bool
427 715150 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
428 : {
429 715150 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
430 714567 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
431 653 : return true;
432 714497 : return false;
433 : }
434 :
435 : /*
436 : * Returns true if we have no buffered tuples
437 : */
438 : static inline bool
439 906 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
440 : {
441 906 : return miinfo->bufferedTuples == 0;
442 : }
443 :
444 : /*
445 : * Write the tuples stored in 'buffer' out to the table.
446 : */
447 : static inline void
448 1474 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
449 : CopyMultiInsertBuffer *buffer,
450 : int64 *processed)
451 : {
452 1474 : CopyFromState cstate = miinfo->cstate;
453 1474 : EState *estate = miinfo->estate;
454 1474 : int nused = buffer->nused;
455 1474 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
456 1474 : TupleTableSlot **slots = buffer->slots;
457 : int i;
458 :
459 1474 : if (resultRelInfo->ri_FdwRoutine)
460 : {
461 7 : int batch_size = resultRelInfo->ri_BatchSize;
462 7 : int sent = 0;
463 :
464 : Assert(buffer->bistate == NULL);
465 :
466 : /* Ensure that the FDW supports batching and it's enabled */
467 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
468 : Assert(batch_size > 1);
469 :
470 : /*
471 : * We suppress error context information other than the relation name,
472 : * if one of the operations below fails.
473 : */
474 : Assert(!cstate->relname_only);
475 7 : cstate->relname_only = true;
476 :
477 19 : while (sent < nused)
478 : {
479 13 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
480 13 : int inserted = size;
481 : TupleTableSlot **rslots;
482 :
483 : /* insert into foreign table: let the FDW do it */
484 : rslots =
485 13 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
486 : resultRelInfo,
487 13 : &slots[sent],
488 : NULL,
489 : &inserted);
490 :
491 12 : sent += size;
492 :
493 : /* No need to do anything if there are no inserted rows */
494 12 : if (inserted <= 0)
495 2 : continue;
496 :
497 : /* Triggers on foreign tables should not have transition tables */
498 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
499 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
500 :
501 : /* Run AFTER ROW INSERT triggers */
502 10 : if (resultRelInfo->ri_TrigDesc != NULL &&
503 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
504 : {
505 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
506 :
507 0 : for (i = 0; i < inserted; i++)
508 : {
509 0 : TupleTableSlot *slot = rslots[i];
510 :
511 : /*
512 : * AFTER ROW Triggers might reference the tableoid column,
513 : * so (re-)initialize tts_tableOid before evaluating them.
514 : */
515 0 : slot->tts_tableOid = relid;
516 :
517 0 : ExecARInsertTriggers(estate, resultRelInfo,
518 : slot, NIL,
519 : cstate->transition_capture);
520 : }
521 : }
522 :
523 : /* Update the row counter and progress of the COPY command */
524 10 : *processed += inserted;
525 10 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
526 : *processed);
527 : }
528 :
529 24 : for (i = 0; i < nused; i++)
530 18 : ExecClearTuple(slots[i]);
531 :
532 : /* reset relname_only */
533 6 : cstate->relname_only = false;
534 : }
535 : else
536 : {
537 1467 : CommandId mycid = miinfo->mycid;
538 1467 : uint32 ti_options = miinfo->ti_options;
539 1467 : bool line_buf_valid = cstate->line_buf_valid;
540 1467 : uint64 save_cur_lineno = cstate->cur_lineno;
541 : MemoryContext oldcontext;
542 :
543 : Assert(buffer->bistate != NULL);
544 :
545 : /*
546 : * Print error context information correctly, if one of the operations
547 : * below fails.
548 : */
549 1467 : cstate->line_buf_valid = false;
550 :
551 : /*
552 : * table_multi_insert may leak memory, so switch to short-lived memory
553 : * context before calling it.
554 : */
555 1467 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
556 1467 : table_multi_insert(resultRelInfo->ri_RelationDesc,
557 : slots,
558 : nused,
559 : mycid,
560 : ti_options,
561 1467 : buffer->bistate);
562 1467 : MemoryContextSwitchTo(oldcontext);
563 :
564 716514 : for (i = 0; i < nused; i++)
565 : {
566 : /*
567 : * If there are any indexes, update them for all the inserted
568 : * tuples, and run AFTER ROW INSERT triggers.
569 : */
570 715056 : if (resultRelInfo->ri_NumIndices > 0)
571 : {
572 : List *recheckIndexes;
573 :
574 100993 : cstate->cur_lineno = buffer->linenos[i];
575 : recheckIndexes =
576 100993 : ExecInsertIndexTuples(resultRelInfo,
577 : estate, 0, buffer->slots[i],
578 : NIL, NULL);
579 100984 : ExecARInsertTriggers(estate, resultRelInfo,
580 100984 : slots[i], recheckIndexes,
581 : cstate->transition_capture);
582 100984 : list_free(recheckIndexes);
583 : }
584 :
585 : /*
586 : * There's no indexes, but see if we need to run AFTER ROW INSERT
587 : * triggers anyway.
588 : */
589 614063 : else if (resultRelInfo->ri_TrigDesc != NULL &&
590 56 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
591 44 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
592 : {
593 24 : cstate->cur_lineno = buffer->linenos[i];
594 24 : ExecARInsertTriggers(estate, resultRelInfo,
595 24 : slots[i], NIL,
596 : cstate->transition_capture);
597 : }
598 :
599 715047 : ExecClearTuple(slots[i]);
600 : }
601 :
602 : /* Update the row counter and progress of the COPY command */
603 1458 : *processed += nused;
604 1458 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
605 : *processed);
606 :
607 : /* reset cur_lineno and line_buf_valid to what they were */
608 1458 : cstate->line_buf_valid = line_buf_valid;
609 1458 : cstate->cur_lineno = save_cur_lineno;
610 : }
611 :
612 : /* Mark that all slots are free */
613 1464 : buffer->nused = 0;
614 1464 : }
615 :
616 : /*
617 : * Drop used slots and free member for this buffer.
618 : *
619 : * The buffer must be flushed before cleanup.
620 : */
621 : static inline void
622 882 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
623 : CopyMultiInsertBuffer *buffer)
624 : {
625 882 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
626 : int i;
627 :
628 : /* Ensure buffer was flushed */
629 : Assert(buffer->nused == 0);
630 :
631 : /* Remove back-link to ourself */
632 882 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
633 :
634 882 : if (resultRelInfo->ri_FdwRoutine == NULL)
635 : {
636 : Assert(buffer->bistate != NULL);
637 876 : FreeBulkInsertState(buffer->bistate);
638 : }
639 : else
640 : Assert(buffer->bistate == NULL);
641 :
642 : /* Since we only create slots on demand, just drop the non-null ones. */
643 150807 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
644 149925 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
645 :
646 882 : if (resultRelInfo->ri_FdwRoutine == NULL)
647 876 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
648 : miinfo->ti_options);
649 :
650 882 : pfree(buffer);
651 882 : }
652 :
653 : /*
654 : * Write out all stored tuples in all buffers out to the tables.
655 : *
656 : * Once flushed we also trim the tracked buffers list down to size by removing
657 : * the buffers created earliest first.
658 : *
659 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
660 : * used. When cleaning up old buffers we'll never remove the one for
661 : * 'curr_rri'.
662 : */
663 : static inline void
664 1359 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
665 : int64 *processed)
666 : {
667 : ListCell *lc;
668 :
669 2823 : foreach(lc, miinfo->multiInsertBuffers)
670 : {
671 1474 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
672 :
673 1474 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
674 : }
675 :
676 1349 : miinfo->bufferedTuples = 0;
677 1349 : miinfo->bufferedBytes = 0;
678 :
679 : /*
680 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
681 : * remove buffers starting with the ones we created first. It seems less
682 : * likely that these older ones will be needed than the ones that were
683 : * just created.
684 : */
685 1349 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
686 : {
687 : CopyMultiInsertBuffer *buffer;
688 :
689 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
690 :
691 : /*
692 : * We never want to remove the buffer that's currently being used, so
693 : * if we happen to find that then move it to the end of the list.
694 : */
695 0 : if (buffer->resultRelInfo == curr_rri)
696 : {
697 : /*
698 : * The code below would misbehave if we were trying to reduce the
699 : * list to less than two items.
700 : */
701 : StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
702 : "MAX_PARTITION_BUFFERS must be >= 2");
703 :
704 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
705 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
706 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
707 : }
708 :
709 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
710 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
711 : }
712 1349 : }
713 :
714 : /*
715 : * Cleanup allocated buffers and free memory
716 : */
717 : static inline void
718 878 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
719 : {
720 : ListCell *lc;
721 :
722 1760 : foreach(lc, miinfo->multiInsertBuffers)
723 882 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
724 :
725 878 : list_free(miinfo->multiInsertBuffers);
726 878 : }
727 :
728 : /*
729 : * Get the next TupleTableSlot that the next tuple should be stored in.
730 : *
731 : * Callers must ensure that the buffer is not full.
732 : *
733 : * Note: 'miinfo' is unused but has been included for consistency with the
734 : * other functions in this area.
735 : */
736 : static inline TupleTableSlot *
737 716197 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
738 : ResultRelInfo *rri)
739 : {
740 716197 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
741 : int nused;
742 :
743 : Assert(buffer != NULL);
744 : Assert(buffer->nused < MAX_BUFFERED_TUPLES);
745 :
746 716197 : nused = buffer->nused;
747 :
748 716197 : if (buffer->slots[nused] == NULL)
749 150151 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
750 716197 : return buffer->slots[nused];
751 : }
752 :
753 : /*
754 : * Record the previously reserved TupleTableSlot that was reserved by
755 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
756 : */
757 : static inline void
758 715150 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
759 : TupleTableSlot *slot, int tuplen, uint64 lineno)
760 : {
761 715150 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
762 :
763 : Assert(buffer != NULL);
764 : Assert(slot == buffer->slots[buffer->nused]);
765 :
766 : /* Store the line number so we can properly report any errors later */
767 715150 : buffer->linenos[buffer->nused] = lineno;
768 :
769 : /* Record this slot as being used */
770 715150 : buffer->nused++;
771 :
772 : /* Update how many tuples are stored and their size */
773 715150 : miinfo->bufferedTuples++;
774 715150 : miinfo->bufferedBytes += tuplen;
775 715150 : }
776 :
777 : /*
778 : * Copy FROM file to relation.
779 : */
780 : uint64
781 1140 : CopyFrom(CopyFromState cstate)
782 : {
783 : ResultRelInfo *resultRelInfo;
784 : ResultRelInfo *target_resultRelInfo;
785 1140 : ResultRelInfo *prevResultRelInfo = NULL;
786 1140 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
787 : ModifyTableState *mtstate;
788 : ExprContext *econtext;
789 1140 : TupleTableSlot *singleslot = NULL;
790 1140 : MemoryContext oldcontext = CurrentMemoryContext;
791 :
792 1140 : PartitionTupleRouting *proute = NULL;
793 : ErrorContextCallback errcallback;
794 1140 : CommandId mycid = GetCurrentCommandId(true);
795 1140 : uint32 ti_options = 0; /* start with default options for insert */
796 1140 : BulkInsertState bistate = NULL;
797 : CopyInsertMethod insertMethod;
798 1140 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
799 1140 : int64 processed = 0;
800 1140 : int64 excluded = 0;
801 : bool has_before_insert_row_trig;
802 : bool has_instead_insert_row_trig;
803 1140 : bool leafpart_use_multi_insert = false;
804 :
805 : Assert(cstate->rel);
806 : Assert(list_length(cstate->range_table) == 1);
807 :
808 1140 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
809 : Assert(cstate->escontext);
810 :
811 : /*
812 : * The target must be a plain, foreign, or partitioned relation, or have
813 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
814 : * allowed on views, so we only hint about them in the view case.)
815 : */
816 1140 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
817 102 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
818 79 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
819 12 : !(cstate->rel->trigdesc &&
820 8 : cstate->rel->trigdesc->trig_insert_instead_row))
821 : {
822 4 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
823 4 : ereport(ERROR,
824 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
825 : errmsg("cannot copy to view \"%s\"",
826 : RelationGetRelationName(cstate->rel)),
827 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
828 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
829 0 : ereport(ERROR,
830 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
831 : errmsg("cannot copy to materialized view \"%s\"",
832 : RelationGetRelationName(cstate->rel))));
833 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
834 0 : ereport(ERROR,
835 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
836 : errmsg("cannot copy to sequence \"%s\"",
837 : RelationGetRelationName(cstate->rel))));
838 : else
839 0 : ereport(ERROR,
840 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
841 : errmsg("cannot copy to non-table relation \"%s\"",
842 : RelationGetRelationName(cstate->rel))));
843 : }
844 :
845 : /*
846 : * If the target file is new-in-transaction, we assume that checking FSM
847 : * for free space is a waste of time. This could possibly be wrong, but
848 : * it's unlikely.
849 : */
850 1136 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
851 1038 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
852 1029 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
853 54 : ti_options |= TABLE_INSERT_SKIP_FSM;
854 :
855 : /*
856 : * Optimize if new relation storage was created in this subxact or one of
857 : * its committed children and we won't see those rows later as part of an
858 : * earlier scan or command. The subxact test ensures that if this subxact
859 : * aborts then the frozen rows won't be visible after xact cleanup. Note
860 : * that the stronger test of exactly which subtransaction created it is
861 : * crucial for correctness of this optimization. The test for an earlier
862 : * scan or command tolerates false negatives. FREEZE causes other sessions
863 : * to see rows they would not see under MVCC, and a false negative merely
864 : * spreads that anomaly to the current session.
865 : */
866 1136 : if (cstate->opts.freeze)
867 : {
868 : /*
869 : * We currently disallow COPY FREEZE on partitioned tables. The
870 : * reason for this is that we've simply not yet opened the partitions
871 : * to determine if the optimization can be applied to them. We could
872 : * go and open them all here, but doing so may be quite a costly
873 : * overhead for small copies. In any case, we may just end up routing
874 : * tuples to a small number of partitions. It seems better just to
875 : * raise an ERROR for partitioned tables.
876 : */
877 45 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
878 : {
879 4 : ereport(ERROR,
880 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
881 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
882 : }
883 :
884 : /* There's currently no support for COPY FREEZE on foreign tables. */
885 41 : if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
886 4 : ereport(ERROR,
887 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
888 : errmsg("cannot perform COPY FREEZE on a foreign table")));
889 :
890 : /*
891 : * Tolerate one registration for the benefit of FirstXactSnapshot.
892 : * Scan-bearing queries generally create at least two registrations,
893 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
894 : * Clear CatalogSnapshot to avoid counting its registration. We'll
895 : * still detect ongoing catalog scans, each of which separately
896 : * registers the snapshot it uses.
897 : */
898 37 : InvalidateCatalogSnapshot();
899 37 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
900 0 : ereport(ERROR,
901 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
902 : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
903 :
904 74 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
905 37 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
906 12 : ereport(ERROR,
907 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
908 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
909 :
910 25 : ti_options |= TABLE_INSERT_FROZEN;
911 : }
912 :
913 : /*
914 : * We need a ResultRelInfo so we can use the regular executor's
915 : * index-entry-making machinery. (There used to be a huge amount of code
916 : * here that basically duplicated execUtils.c ...)
917 : */
918 1116 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
919 : bms_make_singleton(1));
920 1116 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
921 1116 : ExecInitResultRelation(estate, resultRelInfo, 1);
922 :
923 : /* Verify the named relation is a valid target for INSERT */
924 1116 : CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
925 :
926 1115 : ExecOpenIndices(resultRelInfo, false);
927 :
928 : /*
929 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
930 : * foreign-table result relation(s).
931 : */
932 1115 : mtstate = makeNode(ModifyTableState);
933 1115 : mtstate->ps.plan = NULL;
934 1115 : mtstate->ps.state = estate;
935 1115 : mtstate->operation = CMD_INSERT;
936 1115 : mtstate->mt_nrels = 1;
937 1115 : mtstate->resultRelInfo = resultRelInfo;
938 1115 : mtstate->rootResultRelInfo = resultRelInfo;
939 :
940 1115 : if (resultRelInfo->ri_FdwRoutine != NULL &&
941 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
942 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
943 : resultRelInfo);
944 :
945 : /*
946 : * Also, if the named relation is a foreign table, determine if the FDW
947 : * supports batch insert and determine the batch size (a FDW may support
948 : * batching, but it may be disabled for the server/table).
949 : *
950 : * If the FDW does not support batching, we set the batch size to 1.
951 : */
952 1115 : if (resultRelInfo->ri_FdwRoutine != NULL &&
953 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
954 18 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
955 18 : resultRelInfo->ri_BatchSize =
956 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
957 : else
958 1097 : resultRelInfo->ri_BatchSize = 1;
959 :
960 : Assert(resultRelInfo->ri_BatchSize >= 1);
961 :
962 : /* Prepare to catch AFTER triggers. */
963 1115 : AfterTriggerBeginQuery();
964 :
965 : /*
966 : * If there are any triggers with transition tables on the named relation,
967 : * we need to be prepared to capture transition tuples.
968 : *
969 : * Because partition tuple routing would like to know about whether
970 : * transition capture is active, we also set it in mtstate, which is
971 : * passed to ExecFindPartition() below.
972 : */
973 1115 : cstate->transition_capture = mtstate->mt_transition_capture =
974 1115 : MakeTransitionCaptureState(cstate->rel->trigdesc,
975 1115 : RelationGetRelid(cstate->rel),
976 : CMD_INSERT);
977 :
978 : /*
979 : * If the named relation is a partitioned table, initialize state for
980 : * CopyFrom tuple routing.
981 : */
982 1115 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
983 63 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
984 :
985 1115 : if (cstate->whereClause)
986 12 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
987 : &mtstate->ps);
988 :
989 : /*
990 : * It's generally more efficient to prepare a bunch of tuples for
991 : * insertion, and insert them in one
992 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
993 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
994 : * However, there are a number of reasons why we might not be able to do
995 : * this. These are explained below.
996 : */
997 1115 : if (resultRelInfo->ri_TrigDesc != NULL &&
998 118 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
999 58 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
1000 : {
1001 : /*
1002 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1003 : * triggers on the table. Such triggers might query the table we're
1004 : * inserting into and act differently if the tuples that have already
1005 : * been processed and prepared for insertion are not there.
1006 : */
1007 68 : insertMethod = CIM_SINGLE;
1008 : }
1009 1047 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
1010 14 : resultRelInfo->ri_BatchSize == 1)
1011 : {
1012 : /*
1013 : * Can't support multi-inserts to a foreign table if the FDW does not
1014 : * support batching, or it's disabled for the server or foreign table.
1015 : */
1016 9 : insertMethod = CIM_SINGLE;
1017 : }
1018 1038 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1019 19 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1020 : {
1021 : /*
1022 : * For partitioned tables we can't support multi-inserts when there
1023 : * are any statement level insert triggers. It might be possible to
1024 : * allow partitioned tables with such triggers in the future, but for
1025 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
1026 : * statement level insert triggers are on the same relation.
1027 : */
1028 14 : insertMethod = CIM_SINGLE;
1029 : }
1030 1024 : else if (cstate->volatile_defexprs)
1031 : {
1032 : /*
1033 : * Can't support multi-inserts if there are any volatile default
1034 : * expressions in the table. Similarly to the trigger case above,
1035 : * such expressions may query the table we're inserting into.
1036 : *
1037 : * Note: It does not matter if any partitions have any volatile
1038 : * default expressions as we use the defaults from the target of the
1039 : * COPY command.
1040 : */
1041 4 : insertMethod = CIM_SINGLE;
1042 : }
1043 1020 : else if (contain_volatile_functions(cstate->whereClause))
1044 : {
1045 : /*
1046 : * Can't support multi-inserts if there are any volatile function
1047 : * expressions in WHERE clause. Similarly to the trigger case above,
1048 : * such expressions may query the table we're inserting into.
1049 : *
1050 : * Note: the whereClause was already preprocessed in DoCopy(), so it's
1051 : * okay to use contain_volatile_functions() directly.
1052 : */
1053 0 : insertMethod = CIM_SINGLE;
1054 : }
1055 : else
1056 : {
1057 : /*
1058 : * For partitioned tables, we may still be able to perform bulk
1059 : * inserts. However, the possibility of this depends on which types
1060 : * of triggers exist on the partition. We must disable bulk inserts
1061 : * if the partition is a foreign table that can't use batching or it
1062 : * has any before row insert or insert instead triggers (same as we
1063 : * checked above for the parent table). Since the partition's
1064 : * resultRelInfos are initialized only when we actually need to insert
1065 : * the first tuple into them, we must have the intermediate insert
1066 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
1067 : * determine if we can use bulk-inserts for the partition being
1068 : * inserted into.
1069 : */
1070 1020 : if (proute)
1071 45 : insertMethod = CIM_MULTI_CONDITIONAL;
1072 : else
1073 975 : insertMethod = CIM_MULTI;
1074 :
1075 1020 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1076 : estate, mycid, ti_options);
1077 : }
1078 :
1079 : /*
1080 : * If not using batch mode (which allocates slots as needed) set up a
1081 : * tuple slot too. When inserting into a partitioned table, we also need
1082 : * one, even if we might batch insert, to read the tuple in the root
1083 : * partition's form.
1084 : */
1085 1115 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1086 : {
1087 140 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1088 : &estate->es_tupleTable);
1089 140 : bistate = GetBulkInsertState();
1090 : }
1091 :
1092 1233 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1093 118 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1094 :
1095 1233 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1096 118 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1097 :
1098 : /*
1099 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1100 : * should do this for COPY, since it's not really an "INSERT" statement as
1101 : * such. However, executing these triggers maintains consistency with the
1102 : * EACH ROW triggers that we already fire on COPY.
1103 : */
1104 1115 : ExecBSInsertTriggers(estate, resultRelInfo);
1105 :
1106 1115 : econtext = GetPerTupleExprContext(estate);
1107 :
1108 : /* Set up callback to identify error line number */
1109 1115 : errcallback.callback = CopyFromErrorCallback;
1110 1115 : errcallback.arg = cstate;
1111 1115 : errcallback.previous = error_context_stack;
1112 1115 : error_context_stack = &errcallback;
1113 :
1114 : for (;;)
1115 755480 : {
1116 : TupleTableSlot *myslot;
1117 : bool skip_tuple;
1118 :
1119 756595 : CHECK_FOR_INTERRUPTS();
1120 :
1121 : /*
1122 : * Reset the per-tuple exprcontext. We do this after every tuple, to
1123 : * clean-up after expression evaluations etc.
1124 : */
1125 756595 : ResetPerTupleExprContext(estate);
1126 :
1127 : /* select slot to (initially) load row into */
1128 756595 : if (insertMethod == CIM_SINGLE || proute)
1129 : {
1130 149557 : myslot = singleslot;
1131 149557 : Assert(myslot != NULL);
1132 : }
1133 : else
1134 : {
1135 : Assert(resultRelInfo == target_resultRelInfo);
1136 : Assert(insertMethod == CIM_MULTI);
1137 :
1138 607038 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1139 : resultRelInfo);
1140 : }
1141 :
1142 : /*
1143 : * Switch to per-tuple context before calling NextCopyFrom, which does
1144 : * evaluate default expressions etc. and requires per-tuple context.
1145 : */
1146 756595 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1147 :
1148 756595 : ExecClearTuple(myslot);
1149 :
1150 : /* Directly store the values/nulls array in the slot */
1151 756595 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1152 964 : break;
1153 :
1154 755508 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1155 108 : cstate->escontext->error_occurred)
1156 : {
1157 : /*
1158 : * Soft error occurred, skip this tuple and just make
1159 : * ErrorSaveContext ready for the next NextCopyFrom. Since we
1160 : * don't set details_wanted and error_data is not to be filled,
1161 : * just resetting error_occurred is enough.
1162 : */
1163 72 : cstate->escontext->error_occurred = false;
1164 :
1165 : /* Report that this tuple was skipped by the ON_ERROR clause */
1166 72 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1167 72 : cstate->num_errors);
1168 :
1169 72 : if (cstate->opts.reject_limit > 0 &&
1170 32 : cstate->num_errors > cstate->opts.reject_limit)
1171 4 : ereport(ERROR,
1172 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1173 : errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
1174 : cstate->opts.reject_limit)));
1175 :
1176 : /* Repeat NextCopyFrom() until no soft error occurs */
1177 68 : continue;
1178 : }
1179 :
1180 755436 : ExecStoreVirtualTuple(myslot);
1181 :
1182 : /*
1183 : * Constraints and where clause might reference the tableoid column,
1184 : * so (re-)initialize tts_tableOid before evaluating them.
1185 : */
1186 755436 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1187 :
1188 : /* Triggers and stuff need to be invoked in query context. */
1189 755436 : MemoryContextSwitchTo(oldcontext);
1190 :
1191 755436 : if (cstate->whereClause)
1192 : {
1193 44 : econtext->ecxt_scantuple = myslot;
1194 : /* Skip items that don't match COPY's WHERE clause */
1195 44 : if (!ExecQual(cstate->qualexpr, econtext))
1196 : {
1197 : /*
1198 : * Report that this tuple was filtered out by the WHERE
1199 : * clause.
1200 : */
1201 24 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1202 : ++excluded);
1203 24 : continue;
1204 : }
1205 : }
1206 :
1207 : /* Determine the partition to insert the tuple into */
1208 755412 : if (proute)
1209 : {
1210 : TupleConversionMap *map;
1211 :
1212 : /*
1213 : * Attempt to find a partition suitable for this tuple.
1214 : * ExecFindPartition() will raise an error if none can be found or
1215 : * if the found partition is not suitable for INSERTs.
1216 : */
1217 149252 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1218 : proute, myslot, estate);
1219 :
1220 149251 : if (prevResultRelInfo != resultRelInfo)
1221 : {
1222 : /* Determine which triggers exist on this partition */
1223 90815 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1224 36 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1225 :
1226 90815 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1227 36 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1228 :
1229 : /*
1230 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1231 : * OF triggers, or if the partition is a foreign table that
1232 : * can't use batching.
1233 : */
1234 141520 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1235 50741 : !has_before_insert_row_trig &&
1236 192245 : !has_instead_insert_row_trig &&
1237 50725 : (resultRelInfo->ri_FdwRoutine == NULL ||
1238 6 : resultRelInfo->ri_BatchSize > 1);
1239 :
1240 : /* Set the multi-insert buffer to use for this partition. */
1241 90779 : if (leafpart_use_multi_insert)
1242 : {
1243 50723 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1244 49 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1245 : resultRelInfo);
1246 : }
1247 40056 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1248 18 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1249 : {
1250 : /*
1251 : * Flush pending inserts if this partition can't use
1252 : * batching, so rows are visible to triggers etc.
1253 : */
1254 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1255 : resultRelInfo,
1256 : &processed);
1257 : }
1258 :
1259 90779 : if (bistate != NULL)
1260 90779 : ReleaseBulkInsertStatePin(bistate);
1261 90779 : prevResultRelInfo = resultRelInfo;
1262 : }
1263 :
1264 : /*
1265 : * If we're capturing transition tuples, we might need to convert
1266 : * from the partition rowtype to root rowtype. But if there are no
1267 : * BEFORE triggers on the partition that could change the tuple,
1268 : * we can just remember the original unconverted tuple to avoid a
1269 : * needless round trip conversion.
1270 : */
1271 149251 : if (cstate->transition_capture != NULL)
1272 38 : cstate->transition_capture->tcs_original_insert_tuple =
1273 38 : !has_before_insert_row_trig ? myslot : NULL;
1274 :
1275 : /*
1276 : * We might need to convert from the root rowtype to the partition
1277 : * rowtype.
1278 : */
1279 149251 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1280 149251 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1281 : {
1282 : /* non batch insert */
1283 40092 : if (map != NULL)
1284 : {
1285 : TupleTableSlot *new_slot;
1286 :
1287 73 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1288 73 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1289 : }
1290 : }
1291 : else
1292 : {
1293 : /*
1294 : * Prepare to queue up tuple for later batch insert into
1295 : * current partition.
1296 : */
1297 : TupleTableSlot *batchslot;
1298 :
1299 : /* no other path available for partitioned table */
1300 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1301 :
1302 109159 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1303 : resultRelInfo);
1304 :
1305 109159 : if (map != NULL)
1306 8132 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1307 : batchslot);
1308 : else
1309 : {
1310 : /*
1311 : * This looks more expensive than it is (Believe me, I
1312 : * optimized it away. Twice.). The input is in virtual
1313 : * form, and we'll materialize the slot below - for most
1314 : * slot types the copy performs the work materialization
1315 : * would later require anyway.
1316 : */
1317 101027 : ExecCopySlot(batchslot, myslot);
1318 101027 : myslot = batchslot;
1319 : }
1320 : }
1321 :
1322 : /* ensure that triggers etc see the right relation */
1323 149251 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1324 : }
1325 :
1326 755411 : skip_tuple = false;
1327 :
1328 : /* BEFORE ROW INSERT Triggers */
1329 755411 : if (has_before_insert_row_trig)
1330 : {
1331 180 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1332 10 : skip_tuple = true; /* "do nothing" */
1333 : }
1334 :
1335 755411 : if (!skip_tuple)
1336 : {
1337 : /*
1338 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1339 : * tuple. Otherwise, proceed with inserting the tuple into the
1340 : * table or foreign table.
1341 : */
1342 755401 : if (has_instead_insert_row_trig)
1343 : {
1344 8 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1345 : }
1346 : else
1347 : {
1348 : /* Compute stored generated columns */
1349 755393 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1350 241114 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1351 21 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1352 : CMD_INSERT);
1353 :
1354 : /*
1355 : * If the target is a plain table, check the constraints of
1356 : * the tuple.
1357 : */
1358 755393 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1359 755347 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1360 241096 : ExecConstraints(resultRelInfo, myslot, estate);
1361 :
1362 : /*
1363 : * Also check the tuple against the partition constraint, if
1364 : * there is one; except that if we got here via tuple-routing,
1365 : * we don't need to if there's no BR trigger defined on the
1366 : * partition.
1367 : */
1368 755373 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1369 149243 : (proute == NULL || has_before_insert_row_trig))
1370 1167 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1371 :
1372 : /* Store the slot in the multi-insert buffer, when enabled. */
1373 755373 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1374 : {
1375 : /*
1376 : * The slot previously might point into the per-tuple
1377 : * context. For batching it needs to be longer lived.
1378 : */
1379 715150 : ExecMaterializeSlot(myslot);
1380 :
1381 : /* Add this tuple to the tuple buffer */
1382 715150 : CopyMultiInsertInfoStore(&multiInsertInfo,
1383 : resultRelInfo, myslot,
1384 : cstate->line_buf.len,
1385 : cstate->cur_lineno);
1386 :
1387 : /*
1388 : * If enough inserts have queued up, then flush all
1389 : * buffers out to their tables.
1390 : */
1391 715150 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1392 653 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1393 : resultRelInfo,
1394 : &processed);
1395 :
1396 : /*
1397 : * We delay updating the row counter and progress of the
1398 : * COPY command until after writing the tuples stored in
1399 : * the buffer out to the table, as in single insert mode.
1400 : * See CopyMultiInsertBufferFlush().
1401 : */
1402 715150 : continue; /* next tuple please */
1403 : }
1404 : else
1405 : {
1406 40223 : List *recheckIndexes = NIL;
1407 :
1408 : /* OK, store the tuple */
1409 40223 : if (resultRelInfo->ri_FdwRoutine != NULL)
1410 : {
1411 27 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1412 : resultRelInfo,
1413 : myslot,
1414 : NULL);
1415 :
1416 26 : if (myslot == NULL) /* "do nothing" */
1417 2 : continue; /* next tuple please */
1418 :
1419 : /*
1420 : * AFTER ROW Triggers might reference the tableoid
1421 : * column, so (re-)initialize tts_tableOid before
1422 : * evaluating them.
1423 : */
1424 24 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1425 : }
1426 : else
1427 : {
1428 : /* OK, store the tuple and create index entries for it */
1429 40196 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1430 : myslot, mycid, ti_options, bistate);
1431 :
1432 40196 : if (resultRelInfo->ri_NumIndices > 0)
1433 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1434 : estate, 0,
1435 : myslot, NIL,
1436 : NULL);
1437 : }
1438 :
1439 : /* AFTER ROW INSERT Triggers */
1440 40220 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1441 : recheckIndexes, cstate->transition_capture);
1442 :
1443 40218 : list_free(recheckIndexes);
1444 : }
1445 : }
1446 :
1447 : /*
1448 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1449 : * or FDW; this is the same definition used by nodeModifyTable.c
1450 : * for counting tuples inserted by an INSERT command. Update
1451 : * progress of the COPY command as well.
1452 : */
1453 40226 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1454 : ++processed);
1455 : }
1456 : }
1457 :
1458 : /* Flush any remaining buffered tuples */
1459 964 : if (insertMethod != CIM_SINGLE)
1460 : {
1461 888 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1462 706 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1463 : }
1464 :
1465 : /* Done, clean up */
1466 954 : error_context_stack = errcallback.previous;
1467 :
1468 954 : if (cstate->num_errors > 0 &&
1469 24 : cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1470 : {
1471 20 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1472 16 : ereport(NOTICE,
1473 : errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
1474 : "%" PRIu64 " rows were skipped due to data type incompatibility",
1475 : cstate->num_errors,
1476 : cstate->num_errors));
1477 4 : else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1478 4 : ereport(NOTICE,
1479 : errmsg_plural("in %" PRIu64 " row, columns were set to null due to data type incompatibility",
1480 : "in %" PRIu64 " rows, columns were set to null due to data type incompatibility",
1481 : cstate->num_errors,
1482 : cstate->num_errors));
1483 : }
1484 :
1485 954 : if (bistate != NULL)
1486 120 : FreeBulkInsertState(bistate);
1487 :
1488 954 : MemoryContextSwitchTo(oldcontext);
1489 :
1490 : /* Execute AFTER STATEMENT insertion triggers */
1491 954 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1492 :
1493 : /* Handle queued AFTER triggers */
1494 954 : AfterTriggerEndQuery(estate);
1495 :
1496 954 : ExecResetTupleTable(estate->es_tupleTable, false);
1497 :
1498 : /* Allow the FDW to shut down */
1499 954 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1500 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1501 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1502 : target_resultRelInfo);
1503 :
1504 : /* Tear down the multi-insert buffer data */
1505 954 : if (insertMethod != CIM_SINGLE)
1506 878 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1507 :
1508 : /* Close all the partitioned tables, leaf partitions, and their indices */
1509 954 : if (proute)
1510 60 : ExecCleanupTupleRouting(mtstate, proute);
1511 :
1512 : /* Close the result relations, including any trigger target relations */
1513 954 : ExecCloseResultRelations(estate);
1514 954 : ExecCloseRangeTableRelations(estate);
1515 :
1516 954 : FreeExecutorState(estate);
1517 :
1518 954 : return processed;
1519 : }
1520 :
1521 : /*
1522 : * Setup to read tuples from a file for COPY FROM.
1523 : *
1524 : * 'rel': Used as a template for the tuples
1525 : * 'whereClause': WHERE clause from the COPY FROM command
1526 : * 'filename': Name of server-local file to read, NULL for STDIN
1527 : * 'is_program': true if 'filename' is program to execute
1528 : * 'data_source_cb': callback that provides the input data
1529 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1530 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1531 : *
1532 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1533 : */
1534 : CopyFromState
1535 1376 : BeginCopyFrom(ParseState *pstate,
1536 : Relation rel,
1537 : Node *whereClause,
1538 : const char *filename,
1539 : bool is_program,
1540 : copy_data_source_cb data_source_cb,
1541 : List *attnamelist,
1542 : List *options)
1543 : {
1544 : CopyFromState cstate;
1545 1376 : bool pipe = (filename == NULL);
1546 : TupleDesc tupDesc;
1547 : AttrNumber num_phys_attrs,
1548 : num_defaults;
1549 : FmgrInfo *in_functions;
1550 : Oid *typioparams;
1551 : int *defmap;
1552 : ExprState **defexprs;
1553 : MemoryContext oldcontext;
1554 : bool volatile_defexprs;
1555 1376 : const int progress_cols[] = {
1556 : PROGRESS_COPY_COMMAND,
1557 : PROGRESS_COPY_TYPE,
1558 : PROGRESS_COPY_BYTES_TOTAL
1559 : };
1560 1376 : int64 progress_vals[] = {
1561 : PROGRESS_COPY_COMMAND_FROM,
1562 : 0,
1563 : 0
1564 : };
1565 :
1566 : /* Allocate workspace and zero all fields */
1567 1376 : cstate = palloc0_object(CopyFromStateData);
1568 :
1569 : /*
1570 : * We allocate everything used by a cstate in a new memory context. This
1571 : * avoids memory leaks during repeated use of COPY in a query.
1572 : */
1573 1376 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1574 : "COPY",
1575 : ALLOCSET_DEFAULT_SIZES);
1576 :
1577 1376 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1578 :
1579 : /* Extract options from the statement node tree */
1580 1376 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1581 :
1582 : /* Set the format routine */
1583 1195 : cstate->routine = CopyFromGetRoutine(&cstate->opts);
1584 :
1585 : /* Process the target relation */
1586 1195 : cstate->rel = rel;
1587 :
1588 1195 : tupDesc = RelationGetDescr(cstate->rel);
1589 :
1590 : /* process common options or initialization */
1591 :
1592 : /* Generate or convert list of attributes to process */
1593 1195 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1594 :
1595 1195 : num_phys_attrs = tupDesc->natts;
1596 :
1597 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1598 1195 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1599 1195 : if (cstate->opts.force_notnull_all)
1600 8 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1601 1187 : else if (cstate->opts.force_notnull)
1602 : {
1603 : List *attnums;
1604 : ListCell *cur;
1605 :
1606 17 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1607 :
1608 34 : foreach(cur, attnums)
1609 : {
1610 21 : int attnum = lfirst_int(cur);
1611 21 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1612 :
1613 21 : if (!list_member_int(cstate->attnumlist, attnum))
1614 4 : ereport(ERROR,
1615 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1616 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1617 : errmsg("%s column \"%s\" not referenced by COPY",
1618 : "FORCE_NOT_NULL", NameStr(attr->attname))));
1619 17 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1620 : }
1621 : }
1622 :
1623 : /* Set up soft error handler for ON_ERROR */
1624 1191 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1625 : {
1626 66 : cstate->escontext = makeNode(ErrorSaveContext);
1627 66 : cstate->escontext->type = T_ErrorSaveContext;
1628 66 : cstate->escontext->error_occurred = false;
1629 :
1630 66 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
1631 25 : cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1632 66 : cstate->escontext->details_wanted = false;
1633 : }
1634 : else
1635 1125 : cstate->escontext = NULL;
1636 :
1637 1191 : if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1638 : {
1639 25 : int attr_count = list_length(cstate->attnumlist);
1640 :
1641 : /*
1642 : * When data type conversion fails and ON_ERROR is SET_NULL, we need
1643 : * ensure that the input column allow null values. ExecConstraints()
1644 : * will cover most of the cases, but it does not verify domain
1645 : * constraints. Therefore, for constrained domains, the null value
1646 : * check must be performed during the initial string-to-datum
1647 : * conversion (see CopyFromTextLikeOneRow()).
1648 : */
1649 25 : cstate->domain_with_constraint = palloc0_array(bool, attr_count);
1650 :
1651 124 : foreach_int(attno, cstate->attnumlist)
1652 : {
1653 74 : int i = foreach_current_index(attno);
1654 :
1655 74 : Form_pg_attribute att = TupleDescAttr(tupDesc, attno - 1);
1656 :
1657 74 : cstate->domain_with_constraint[i] = DomainHasConstraints(att->atttypid, NULL);
1658 : }
1659 : }
1660 :
1661 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1662 1191 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1663 1191 : if (cstate->opts.force_null_all)
1664 8 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1665 1183 : else if (cstate->opts.force_null)
1666 : {
1667 : List *attnums;
1668 : ListCell *cur;
1669 :
1670 17 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1671 :
1672 34 : foreach(cur, attnums)
1673 : {
1674 21 : int attnum = lfirst_int(cur);
1675 21 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1676 :
1677 21 : if (!list_member_int(cstate->attnumlist, attnum))
1678 4 : ereport(ERROR,
1679 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1680 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1681 : errmsg("%s column \"%s\" not referenced by COPY",
1682 : "FORCE_NULL", NameStr(attr->attname))));
1683 17 : cstate->opts.force_null_flags[attnum - 1] = true;
1684 : }
1685 : }
1686 :
1687 : /* Convert convert_selectively name list to per-column flags */
1688 1187 : if (cstate->opts.convert_selectively)
1689 : {
1690 : List *attnums;
1691 : ListCell *cur;
1692 :
1693 3 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1694 :
1695 3 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1696 :
1697 7 : foreach(cur, attnums)
1698 : {
1699 4 : int attnum = lfirst_int(cur);
1700 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1701 :
1702 4 : if (!list_member_int(cstate->attnumlist, attnum))
1703 0 : ereport(ERROR,
1704 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1705 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1706 : NameStr(attr->attname))));
1707 4 : cstate->convert_select_flags[attnum - 1] = true;
1708 : }
1709 : }
1710 :
1711 : /* Use client encoding when ENCODING option is not specified. */
1712 1187 : if (cstate->opts.file_encoding < 0)
1713 1167 : cstate->file_encoding = pg_get_client_encoding();
1714 : else
1715 20 : cstate->file_encoding = cstate->opts.file_encoding;
1716 :
1717 : /*
1718 : * Look up encoding conversion function.
1719 : */
1720 1187 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1721 52 : cstate->file_encoding == PG_SQL_ASCII ||
1722 24 : GetDatabaseEncoding() == PG_SQL_ASCII)
1723 : {
1724 1163 : cstate->need_transcoding = false;
1725 : }
1726 : else
1727 : {
1728 24 : cstate->need_transcoding = true;
1729 24 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1730 : GetDatabaseEncoding());
1731 24 : if (!OidIsValid(cstate->conversion_proc))
1732 0 : ereport(ERROR,
1733 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1734 : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1735 : pg_encoding_to_char(cstate->file_encoding),
1736 : pg_encoding_to_char(GetDatabaseEncoding()))));
1737 : }
1738 :
1739 1187 : cstate->copy_src = COPY_FILE; /* default */
1740 :
1741 1187 : cstate->whereClause = whereClause;
1742 :
1743 : /* Initialize state variables */
1744 1187 : cstate->eol_type = EOL_UNKNOWN;
1745 1187 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1746 1187 : cstate->cur_lineno = 0;
1747 1187 : cstate->cur_attname = NULL;
1748 1187 : cstate->cur_attval = NULL;
1749 1187 : cstate->relname_only = false;
1750 1187 : cstate->simd_enabled = true;
1751 :
1752 : /*
1753 : * Allocate buffers for the input pipeline.
1754 : *
1755 : * attribute_buf and raw_buf are used in both text and binary modes, but
1756 : * input_buf and line_buf only in text mode.
1757 : */
1758 1187 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1759 1187 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1760 1187 : cstate->raw_reached_eof = false;
1761 :
1762 1187 : initStringInfo(&cstate->attribute_buf);
1763 :
1764 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1765 1187 : if (pstate)
1766 : {
1767 1149 : cstate->range_table = pstate->p_rtable;
1768 1149 : cstate->rteperminfos = pstate->p_rteperminfos;
1769 : }
1770 :
1771 1187 : num_defaults = 0;
1772 1187 : volatile_defexprs = false;
1773 :
1774 : /*
1775 : * Pick up the required catalog information for each attribute in the
1776 : * relation, including the input function, the element type (to pass to
1777 : * the input function), and info about defaults and constraints. (Which
1778 : * input function we use depends on text/binary format choice.)
1779 : */
1780 1187 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1781 1187 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1782 1187 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1783 1187 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1784 :
1785 4731 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1786 : {
1787 3553 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1788 :
1789 : /* We don't need info for dropped attributes */
1790 3553 : if (att->attisdropped)
1791 82 : continue;
1792 :
1793 : /* Fetch the input function and typioparam info */
1794 3471 : cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1795 3471 : &in_functions[attnum - 1],
1796 3471 : &typioparams[attnum - 1]);
1797 :
1798 : /* Get default info if available */
1799 3470 : defexprs[attnum - 1] = NULL;
1800 :
1801 : /*
1802 : * We only need the default values for columns that do not appear in
1803 : * the column list, unless the DEFAULT option was given. We never need
1804 : * default values for generated columns.
1805 : */
1806 3470 : if ((cstate->opts.default_print != NULL ||
1807 3388 : !list_member_int(cstate->attnumlist, attnum)) &&
1808 421 : !att->attgenerated)
1809 : {
1810 399 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1811 : attnum);
1812 :
1813 399 : if (defexpr != NULL)
1814 : {
1815 : /* Run the expression through planner */
1816 167 : defexpr = expression_planner(defexpr);
1817 :
1818 : /* Initialize executable expression in copycontext */
1819 159 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1820 :
1821 : /* if NOT copied from input */
1822 : /* use default value if one exists */
1823 159 : if (!list_member_int(cstate->attnumlist, attnum))
1824 : {
1825 107 : defmap[num_defaults] = attnum - 1;
1826 107 : num_defaults++;
1827 : }
1828 :
1829 : /*
1830 : * If a default expression looks at the table being loaded,
1831 : * then it could give the wrong answer when using
1832 : * multi-insert. Since database access can be dynamic this is
1833 : * hard to test for exactly, so we use the much wider test of
1834 : * whether the default expression is volatile. We allow for
1835 : * the special case of when the default expression is the
1836 : * nextval() of a sequence which in this specific case is
1837 : * known to be safe for use with the multi-insert
1838 : * optimization. Hence we use this special case function
1839 : * checker rather than the standard check for
1840 : * contain_volatile_functions(). Note also that we already
1841 : * ran the expression through expression_planner().
1842 : */
1843 159 : if (!volatile_defexprs)
1844 159 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1845 : }
1846 : }
1847 : }
1848 :
1849 1178 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1850 :
1851 : /* initialize progress */
1852 1178 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1853 1178 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1854 1178 : cstate->bytes_processed = 0;
1855 :
1856 : /* We keep those variables in cstate. */
1857 1178 : cstate->in_functions = in_functions;
1858 1178 : cstate->typioparams = typioparams;
1859 1178 : cstate->defmap = defmap;
1860 1178 : cstate->defexprs = defexprs;
1861 1178 : cstate->volatile_defexprs = volatile_defexprs;
1862 1178 : cstate->num_defaults = num_defaults;
1863 1178 : cstate->is_program = is_program;
1864 :
1865 1178 : if (data_source_cb)
1866 : {
1867 209 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1868 209 : cstate->copy_src = COPY_CALLBACK;
1869 209 : cstate->data_source_cb = data_source_cb;
1870 : }
1871 969 : else if (pipe)
1872 : {
1873 681 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1874 : Assert(!is_program); /* the grammar does not allow this */
1875 681 : if (whereToSendOutput == DestRemote)
1876 681 : ReceiveCopyBegin(cstate);
1877 : else
1878 0 : cstate->copy_file = stdin;
1879 : }
1880 : else
1881 : {
1882 288 : cstate->filename = pstrdup(filename);
1883 :
1884 288 : if (cstate->is_program)
1885 : {
1886 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1887 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1888 0 : if (cstate->copy_file == NULL)
1889 0 : ereport(ERROR,
1890 : (errcode_for_file_access(),
1891 : errmsg("could not execute command \"%s\": %m",
1892 : cstate->filename)));
1893 : }
1894 : else
1895 : {
1896 : struct stat st;
1897 :
1898 288 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1899 288 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1900 288 : if (cstate->copy_file == NULL)
1901 : {
1902 : /* copy errno because ereport subfunctions might change it */
1903 0 : int save_errno = errno;
1904 :
1905 0 : ereport(ERROR,
1906 : (errcode_for_file_access(),
1907 : errmsg("could not open file \"%s\" for reading: %m",
1908 : cstate->filename),
1909 : (save_errno == ENOENT || save_errno == EACCES) ?
1910 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1911 : "You may want a client-side facility such as psql's \\copy.") : 0));
1912 : }
1913 :
1914 288 : if (fstat(fileno(cstate->copy_file), &st))
1915 0 : ereport(ERROR,
1916 : (errcode_for_file_access(),
1917 : errmsg("could not stat file \"%s\": %m",
1918 : cstate->filename)));
1919 :
1920 288 : if (S_ISDIR(st.st_mode))
1921 0 : ereport(ERROR,
1922 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1923 : errmsg("\"%s\" is a directory", cstate->filename)));
1924 :
1925 288 : progress_vals[2] = st.st_size;
1926 : }
1927 : }
1928 :
1929 1178 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1930 :
1931 1178 : cstate->routine->CopyFromStart(cstate, tupDesc);
1932 :
1933 1178 : MemoryContextSwitchTo(oldcontext);
1934 :
1935 1178 : return cstate;
1936 : }
1937 :
1938 : /*
1939 : * Clean up storage and release resources for COPY FROM.
1940 : */
1941 : void
1942 790 : EndCopyFrom(CopyFromState cstate)
1943 : {
1944 : /* Invoke the end callback */
1945 790 : cstate->routine->CopyFromEnd(cstate);
1946 :
1947 : /* No COPY FROM related resources except memory. */
1948 790 : if (cstate->is_program)
1949 : {
1950 0 : ClosePipeFromProgram(cstate);
1951 : }
1952 : else
1953 : {
1954 790 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1955 0 : ereport(ERROR,
1956 : (errcode_for_file_access(),
1957 : errmsg("could not close file \"%s\": %m",
1958 : cstate->filename)));
1959 : }
1960 :
1961 790 : pgstat_progress_end_command();
1962 :
1963 790 : MemoryContextDelete(cstate->copycontext);
1964 790 : pfree(cstate);
1965 790 : }
1966 :
1967 : /*
1968 : * Closes the pipe from an external program, checking the pclose() return code.
1969 : */
1970 : static void
1971 0 : ClosePipeFromProgram(CopyFromState cstate)
1972 : {
1973 : int pclose_rc;
1974 :
1975 : Assert(cstate->is_program);
1976 :
1977 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1978 0 : if (pclose_rc == -1)
1979 0 : ereport(ERROR,
1980 : (errcode_for_file_access(),
1981 : errmsg("could not close pipe to external command: %m")));
1982 0 : else if (pclose_rc != 0)
1983 : {
1984 : /*
1985 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1986 : * expectable for the called program to fail with SIGPIPE, and we
1987 : * should not report that as an error. Otherwise, SIGPIPE indicates a
1988 : * problem.
1989 : */
1990 0 : if (!cstate->raw_reached_eof &&
1991 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1992 0 : return;
1993 :
1994 0 : ereport(ERROR,
1995 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1996 : errmsg("program \"%s\" failed",
1997 : cstate->filename),
1998 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1999 : }
2000 : }
|