Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/tableam.h"
29 : #include "access/tupconvert.h"
30 : #include "access/xact.h"
31 : #include "catalog/namespace.h"
32 : #include "commands/copyapi.h"
33 : #include "commands/copyfrom_internal.h"
34 : #include "commands/progress.h"
35 : #include "commands/trigger.h"
36 : #include "executor/execPartition.h"
37 : #include "executor/executor.h"
38 : #include "executor/nodeModifyTable.h"
39 : #include "executor/tuptable.h"
40 : #include "foreign/fdwapi.h"
41 : #include "mb/pg_wchar.h"
42 : #include "miscadmin.h"
43 : #include "nodes/miscnodes.h"
44 : #include "optimizer/optimizer.h"
45 : #include "pgstat.h"
46 : #include "rewrite/rewriteHandler.h"
47 : #include "storage/fd.h"
48 : #include "tcop/tcopprot.h"
49 : #include "utils/lsyscache.h"
50 : #include "utils/memutils.h"
51 : #include "utils/portal.h"
52 : #include "utils/rel.h"
53 : #include "utils/snapmgr.h"
54 : #include "utils/typcache.h"
55 :
56 : /*
57 : * No more than this many tuples per CopyMultiInsertBuffer
58 : *
59 : * Caution: Don't make this too big, as we could end up with this many
60 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
61 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
62 : * memory requirements during copies into partitioned tables with a large
63 : * number of partitions.
64 : */
65 : #define MAX_BUFFERED_TUPLES 1000
66 :
67 : /*
68 : * Flush buffers if there are >= this many bytes, as counted by the input
69 : * size, of tuples stored.
70 : */
71 : #define MAX_BUFFERED_BYTES 65535
72 :
73 : /*
74 : * Trim the list of buffers back down to this number after flushing. This
75 : * must be >= 2.
76 : */
77 : #define MAX_PARTITION_BUFFERS 32
78 :
79 : /* Stores multi-insert data related to a single relation in CopyFrom. */
80 : typedef struct CopyMultiInsertBuffer
81 : {
82 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
83 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
84 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
85 : * table; NULL if foreign table */
86 : int nused; /* number of 'slots' containing tuples */
87 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
88 : * stream */
89 : } CopyMultiInsertBuffer;
90 :
91 : /*
92 : * Stores one or many CopyMultiInsertBuffers and details about the size and
93 : * number of tuples which are stored in them. This allows multiple buffers to
94 : * exist at once when COPYing into a partitioned table.
95 : */
96 : typedef struct CopyMultiInsertInfo
97 : {
98 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
99 : int bufferedTuples; /* number of tuples buffered over all buffers */
100 : int bufferedBytes; /* number of bytes from all buffered tuples */
101 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
102 : EState *estate; /* Executor state used for COPY */
103 : CommandId mycid; /* Command Id used for COPY */
104 : uint32 ti_options; /* table insert options */
105 : } CopyMultiInsertInfo;
106 :
107 :
108 : /* non-export function prototypes */
109 : static void ClosePipeFromProgram(CopyFromState cstate);
110 :
111 : /*
112 : * Built-in format-specific routines. One-row callbacks are defined in
113 : * copyfromparse.c.
114 : */
115 : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
116 : Oid *typioparam);
117 : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
118 : static void CopyFromTextLikeEnd(CopyFromState cstate);
119 : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
120 : FmgrInfo *finfo, Oid *typioparam);
121 : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
122 : static void CopyFromBinaryEnd(CopyFromState cstate);
123 :
124 :
125 : /*
126 : * COPY FROM routines for built-in formats.
127 : *
128 : * CSV and text formats share the same TextLike routines except for the
129 : * one-row callback.
130 : */
131 :
132 : /* text format */
133 : static const CopyFromRoutine CopyFromRoutineText = {
134 : .CopyFromInFunc = CopyFromTextLikeInFunc,
135 : .CopyFromStart = CopyFromTextLikeStart,
136 : .CopyFromOneRow = CopyFromTextOneRow,
137 : .CopyFromEnd = CopyFromTextLikeEnd,
138 : };
139 :
140 : /* CSV format */
141 : static const CopyFromRoutine CopyFromRoutineCSV = {
142 : .CopyFromInFunc = CopyFromTextLikeInFunc,
143 : .CopyFromStart = CopyFromTextLikeStart,
144 : .CopyFromOneRow = CopyFromCSVOneRow,
145 : .CopyFromEnd = CopyFromTextLikeEnd,
146 : };
147 :
148 : /* binary format */
149 : static const CopyFromRoutine CopyFromRoutineBinary = {
150 : .CopyFromInFunc = CopyFromBinaryInFunc,
151 : .CopyFromStart = CopyFromBinaryStart,
152 : .CopyFromOneRow = CopyFromBinaryOneRow,
153 : .CopyFromEnd = CopyFromBinaryEnd,
154 : };
155 :
156 : /* Return a COPY FROM routine for the given options */
157 : static const CopyFromRoutine *
158 1206 : CopyFromGetRoutine(const CopyFormatOptions *opts)
159 : {
160 1206 : if (opts->format == COPY_FORMAT_CSV)
161 189 : return &CopyFromRoutineCSV;
162 1017 : else if (opts->format == COPY_FORMAT_BINARY)
163 9 : return &CopyFromRoutineBinary;
164 :
165 : /* default is text */
166 1008 : return &CopyFromRoutineText;
167 : }
168 :
169 : /* Implementation of the start callback for text and CSV formats */
170 : static void
171 1181 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
172 : {
173 : AttrNumber attr_count;
174 :
175 : /*
176 : * If encoding conversion is needed, we need another buffer to hold the
177 : * converted input data. Otherwise, we can just point input_buf to the
178 : * same buffer as raw_buf.
179 : */
180 1181 : if (cstate->need_transcoding)
181 : {
182 24 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
183 24 : cstate->input_buf_index = cstate->input_buf_len = 0;
184 : }
185 : else
186 1157 : cstate->input_buf = cstate->raw_buf;
187 1181 : cstate->input_reached_eof = false;
188 :
189 1181 : initStringInfo(&cstate->line_buf);
190 :
191 : /*
192 : * Create workspace for CopyReadAttributes results; used by CSV and text
193 : * format.
194 : */
195 1181 : attr_count = list_length(cstate->attnumlist);
196 1181 : cstate->max_fields = attr_count;
197 1181 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
198 1181 : }
199 :
200 : /*
201 : * Implementation of the infunc callback for text and CSV formats. Assign
202 : * the input function data to the given *finfo.
203 : */
204 : static void
205 3454 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
206 : Oid *typioparam)
207 : {
208 : Oid func_oid;
209 :
210 3454 : getTypeInputInfo(atttypid, &func_oid, typioparam);
211 3454 : fmgr_info(func_oid, finfo);
212 3454 : }
213 :
214 : /* Implementation of the end callback for text and CSV formats */
215 : static void
216 985 : CopyFromTextLikeEnd(CopyFromState cstate)
217 : {
218 : /* nothing to do */
219 985 : }
220 :
221 : /* Implementation of the start callback for binary format */
222 : static void
223 8 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
224 : {
225 : /* Read and verify binary header */
226 8 : ReceiveCopyBinaryHeader(cstate);
227 8 : }
228 :
229 : /*
230 : * Implementation of the infunc callback for binary format. Assign
231 : * the binary input function to the given *finfo.
232 : */
233 : static void
234 38 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
235 : FmgrInfo *finfo, Oid *typioparam)
236 : {
237 : Oid func_oid;
238 :
239 38 : getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
240 37 : fmgr_info(func_oid, finfo);
241 37 : }
242 :
243 : /* Implementation of the end callback for binary format */
244 : static void
245 7 : CopyFromBinaryEnd(CopyFromState cstate)
246 : {
247 : /* nothing to do */
248 7 : }
249 :
250 : /*
251 : * error context callback for COPY FROM
252 : *
253 : * The argument for the error context must be CopyFromState.
254 : */
255 : void
256 252 : CopyFromErrorCallback(void *arg)
257 : {
258 252 : CopyFromState cstate = (CopyFromState) arg;
259 :
260 252 : if (cstate->relname_only)
261 : {
262 45 : errcontext("COPY %s",
263 : cstate->cur_relname);
264 45 : return;
265 : }
266 207 : if (cstate->opts.format == COPY_FORMAT_BINARY)
267 : {
268 : /* can't usefully display the data */
269 1 : if (cstate->cur_attname)
270 1 : errcontext("COPY %s, line %" PRIu64 ", column %s",
271 : cstate->cur_relname,
272 : cstate->cur_lineno,
273 : cstate->cur_attname);
274 : else
275 0 : errcontext("COPY %s, line %" PRIu64,
276 : cstate->cur_relname,
277 : cstate->cur_lineno);
278 : }
279 : else
280 : {
281 206 : if (cstate->cur_attname && cstate->cur_attval)
282 34 : {
283 : /* error is relevant to a particular column */
284 : char *attval;
285 :
286 34 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
287 34 : errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
288 : cstate->cur_relname,
289 : cstate->cur_lineno,
290 : cstate->cur_attname,
291 : attval);
292 34 : pfree(attval);
293 : }
294 172 : else if (cstate->cur_attname)
295 : {
296 : /* error is relevant to a particular column, value is NULL */
297 12 : errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
298 : cstate->cur_relname,
299 : cstate->cur_lineno,
300 : cstate->cur_attname);
301 : }
302 : else
303 : {
304 : /*
305 : * Error is relevant to a particular line.
306 : *
307 : * If line_buf still contains the correct line, print it.
308 : */
309 160 : if (cstate->line_buf_valid)
310 : {
311 : char *lineval;
312 :
313 129 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
314 129 : errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
315 : cstate->cur_relname,
316 : cstate->cur_lineno, lineval);
317 129 : pfree(lineval);
318 : }
319 : else
320 : {
321 31 : errcontext("COPY %s, line %" PRIu64,
322 : cstate->cur_relname,
323 : cstate->cur_lineno);
324 : }
325 : }
326 : }
327 : }
328 :
329 : /*
330 : * Make sure we don't print an unreasonable amount of COPY data in a message.
331 : *
332 : * Returns a pstrdup'd copy of the input.
333 : */
334 : char *
335 203 : CopyLimitPrintoutLength(const char *str)
336 : {
337 : #define MAX_COPY_DATA_DISPLAY 100
338 :
339 203 : int slen = strlen(str);
340 : int len;
341 : char *res;
342 :
343 : /* Fast path if definitely okay */
344 203 : if (slen <= MAX_COPY_DATA_DISPLAY)
345 203 : return pstrdup(str);
346 :
347 : /* Apply encoding-dependent truncation */
348 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
349 :
350 : /*
351 : * Truncate, and add "..." to show we truncated the input.
352 : */
353 0 : res = (char *) palloc(len + 4);
354 0 : memcpy(res, str, len);
355 0 : strcpy(res + len, "...");
356 :
357 0 : return res;
358 : }
359 :
360 : /*
361 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
362 : * ResultRelInfo.
363 : */
364 : static CopyMultiInsertBuffer *
365 1035 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
366 : {
367 : CopyMultiInsertBuffer *buffer;
368 :
369 1035 : buffer = palloc_object(CopyMultiInsertBuffer);
370 1035 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
371 1035 : buffer->resultRelInfo = rri;
372 1035 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
373 1035 : buffer->nused = 0;
374 :
375 1035 : return buffer;
376 : }
377 :
378 : /*
379 : * Make a new buffer for this ResultRelInfo.
380 : */
381 : static inline void
382 1035 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
383 : ResultRelInfo *rri)
384 : {
385 : CopyMultiInsertBuffer *buffer;
386 :
387 1035 : buffer = CopyMultiInsertBufferInit(rri);
388 :
389 : /* Setup back-link so we can easily find this buffer again */
390 1035 : rri->ri_CopyMultiInsertBuffer = buffer;
391 : /* Record that we're tracking this buffer */
392 1035 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
393 1035 : }
394 :
395 : /*
396 : * Initialize an already allocated CopyMultiInsertInfo.
397 : *
398 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
399 : * for that table.
400 : */
401 : static void
402 1031 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
403 : CopyFromState cstate, EState *estate, CommandId mycid,
404 : uint32 ti_options)
405 : {
406 1031 : miinfo->multiInsertBuffers = NIL;
407 1031 : miinfo->bufferedTuples = 0;
408 1031 : miinfo->bufferedBytes = 0;
409 1031 : miinfo->cstate = cstate;
410 1031 : miinfo->estate = estate;
411 1031 : miinfo->mycid = mycid;
412 1031 : miinfo->ti_options = ti_options;
413 :
414 : /*
415 : * Only setup the buffer when not dealing with a partitioned table.
416 : * Buffers for partitioned tables will just be setup when we need to send
417 : * tuples their way for the first time.
418 : */
419 1031 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
420 986 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
421 1031 : }
422 :
423 : /*
424 : * Returns true if the buffers are full
425 : */
426 : static inline bool
427 718227 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
428 : {
429 718227 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
430 717641 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
431 656 : return true;
432 717571 : return false;
433 : }
434 :
435 : /*
436 : * Returns true if we have no buffered tuples
437 : */
438 : static inline bool
439 913 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
440 : {
441 913 : return miinfo->bufferedTuples == 0;
442 : }
443 :
444 : /*
445 : * Write the tuples stored in 'buffer' out to the table.
446 : */
447 : static inline void
448 1484 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
449 : CopyMultiInsertBuffer *buffer,
450 : int64 *processed)
451 : {
452 1484 : CopyFromState cstate = miinfo->cstate;
453 1484 : EState *estate = miinfo->estate;
454 1484 : int nused = buffer->nused;
455 1484 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
456 1484 : TupleTableSlot **slots = buffer->slots;
457 : int i;
458 :
459 1484 : if (resultRelInfo->ri_FdwRoutine)
460 : {
461 7 : int batch_size = resultRelInfo->ri_BatchSize;
462 7 : int sent = 0;
463 :
464 : Assert(buffer->bistate == NULL);
465 :
466 : /* Ensure that the FDW supports batching and it's enabled */
467 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
468 : Assert(batch_size > 1);
469 :
470 : /*
471 : * We suppress error context information other than the relation name,
472 : * if one of the operations below fails.
473 : */
474 : Assert(!cstate->relname_only);
475 7 : cstate->relname_only = true;
476 :
477 19 : while (sent < nused)
478 : {
479 13 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
480 13 : int inserted = size;
481 : TupleTableSlot **rslots;
482 :
483 : /* insert into foreign table: let the FDW do it */
484 : rslots =
485 13 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
486 : resultRelInfo,
487 13 : &slots[sent],
488 : NULL,
489 : &inserted);
490 :
491 12 : sent += size;
492 :
493 : /* No need to do anything if there are no inserted rows */
494 12 : if (inserted <= 0)
495 2 : continue;
496 :
497 : /* Triggers on foreign tables should not have transition tables */
498 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
499 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
500 :
501 : /* Run AFTER ROW INSERT triggers */
502 10 : if (resultRelInfo->ri_TrigDesc != NULL &&
503 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
504 : {
505 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
506 :
507 0 : for (i = 0; i < inserted; i++)
508 : {
509 0 : TupleTableSlot *slot = rslots[i];
510 :
511 : /*
512 : * AFTER ROW Triggers might reference the tableoid column,
513 : * so (re-)initialize tts_tableOid before evaluating them.
514 : */
515 0 : slot->tts_tableOid = relid;
516 :
517 0 : ExecARInsertTriggers(estate, resultRelInfo,
518 : slot, NIL,
519 : cstate->transition_capture);
520 : }
521 : }
522 :
523 : /* Update the row counter and progress of the COPY command */
524 10 : *processed += inserted;
525 10 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
526 : *processed);
527 : }
528 :
529 24 : for (i = 0; i < nused; i++)
530 18 : ExecClearTuple(slots[i]);
531 :
532 : /* reset relname_only */
533 6 : cstate->relname_only = false;
534 : }
535 : else
536 : {
537 1477 : CommandId mycid = miinfo->mycid;
538 1477 : uint32 ti_options = miinfo->ti_options;
539 1477 : bool line_buf_valid = cstate->line_buf_valid;
540 1477 : uint64 save_cur_lineno = cstate->cur_lineno;
541 : MemoryContext oldcontext;
542 :
543 : Assert(buffer->bistate != NULL);
544 :
545 : /*
546 : * Print error context information correctly, if one of the operations
547 : * below fails.
548 : */
549 1477 : cstate->line_buf_valid = false;
550 :
551 : /*
552 : * table_multi_insert may leak memory, so switch to short-lived memory
553 : * context before calling it.
554 : */
555 1477 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
556 1477 : table_multi_insert(resultRelInfo->ri_RelationDesc,
557 : slots,
558 : nused,
559 : mycid,
560 : ti_options,
561 1477 : buffer->bistate);
562 1477 : MemoryContextSwitchTo(oldcontext);
563 :
564 719540 : for (i = 0; i < nused; i++)
565 : {
566 : /*
567 : * If there are any indexes, update them for all the inserted
568 : * tuples, and run AFTER ROW INSERT triggers.
569 : */
570 718076 : if (resultRelInfo->ri_NumIndices > 0)
571 : {
572 : List *recheckIndexes;
573 :
574 101003 : cstate->cur_lineno = buffer->linenos[i];
575 : recheckIndexes =
576 101003 : ExecInsertIndexTuples(resultRelInfo,
577 : estate, 0, buffer->slots[i],
578 : NIL, NULL);
579 100990 : ExecARInsertTriggers(estate, resultRelInfo,
580 100990 : slots[i], recheckIndexes,
581 : cstate->transition_capture);
582 100990 : list_free(recheckIndexes);
583 : }
584 :
585 : /*
586 : * There's no indexes, but see if we need to run AFTER ROW INSERT
587 : * triggers anyway.
588 : */
589 617073 : else if (resultRelInfo->ri_TrigDesc != NULL &&
590 56 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
591 44 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
592 : {
593 24 : cstate->cur_lineno = buffer->linenos[i];
594 24 : ExecARInsertTriggers(estate, resultRelInfo,
595 24 : slots[i], NIL,
596 : cstate->transition_capture);
597 : }
598 :
599 718063 : ExecClearTuple(slots[i]);
600 : }
601 :
602 : /* Update the row counter and progress of the COPY command */
603 1464 : *processed += nused;
604 1464 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
605 : *processed);
606 :
607 : /* reset cur_lineno and line_buf_valid to what they were */
608 1464 : cstate->line_buf_valid = line_buf_valid;
609 1464 : cstate->cur_lineno = save_cur_lineno;
610 : }
611 :
612 : /* Mark that all slots are free */
613 1470 : buffer->nused = 0;
614 1470 : }
615 :
616 : /*
617 : * Drop used slots and free member for this buffer.
618 : *
619 : * The buffer must be flushed before cleanup.
620 : */
621 : static inline void
622 885 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
623 : CopyMultiInsertBuffer *buffer)
624 : {
625 885 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
626 : int i;
627 :
628 : /* Ensure buffer was flushed */
629 : Assert(buffer->nused == 0);
630 :
631 : /* Remove back-link to ourself */
632 885 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
633 :
634 885 : if (resultRelInfo->ri_FdwRoutine == NULL)
635 : {
636 : Assert(buffer->bistate != NULL);
637 879 : FreeBulkInsertState(buffer->bistate);
638 : }
639 : else
640 : Assert(buffer->bistate == NULL);
641 :
642 : /* Since we only create slots on demand, just drop the non-null ones. */
643 150829 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
644 149944 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
645 :
646 885 : if (resultRelInfo->ri_FdwRoutine == NULL)
647 879 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
648 : miinfo->ti_options);
649 :
650 885 : pfree(buffer);
651 885 : }
652 :
653 : /*
654 : * Write out all stored tuples in all buffers out to the tables.
655 : *
656 : * Once flushed we also trim the tracked buffers list down to size by removing
657 : * the buffers created earliest first.
658 : *
659 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
660 : * used. When cleaning up old buffers we'll never remove the one for
661 : * 'curr_rri'.
662 : */
663 : static inline void
664 1369 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
665 : int64 *processed)
666 : {
667 : ListCell *lc;
668 :
669 2839 : foreach(lc, miinfo->multiInsertBuffers)
670 : {
671 1484 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
672 :
673 1484 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
674 : }
675 :
676 1355 : miinfo->bufferedTuples = 0;
677 1355 : miinfo->bufferedBytes = 0;
678 :
679 : /*
680 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
681 : * remove buffers starting with the ones we created first. It seems less
682 : * likely that these older ones will be needed than the ones that were
683 : * just created.
684 : */
685 1355 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
686 : {
687 : CopyMultiInsertBuffer *buffer;
688 :
689 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
690 :
691 : /*
692 : * We never want to remove the buffer that's currently being used, so
693 : * if we happen to find that then move it to the end of the list.
694 : */
695 0 : if (buffer->resultRelInfo == curr_rri)
696 : {
697 : /*
698 : * The code below would misbehave if we were trying to reduce the
699 : * list to less than two items.
700 : */
701 : StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
702 : "MAX_PARTITION_BUFFERS must be >= 2");
703 :
704 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
705 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
706 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
707 : }
708 :
709 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
710 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
711 : }
712 1355 : }
713 :
714 : /*
715 : * Cleanup allocated buffers and free memory
716 : */
717 : static inline void
718 881 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
719 : {
720 : ListCell *lc;
721 :
722 1766 : foreach(lc, miinfo->multiInsertBuffers)
723 885 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
724 :
725 881 : list_free(miinfo->multiInsertBuffers);
726 881 : }
727 :
728 : /*
729 : * Get the next TupleTableSlot that the next tuple should be stored in.
730 : *
731 : * Callers must ensure that the buffer is not full.
732 : *
733 : * Note: 'miinfo' is unused but has been included for consistency with the
734 : * other functions in this area.
735 : */
736 : static inline TupleTableSlot *
737 719285 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
738 : ResultRelInfo *rri)
739 : {
740 719285 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
741 : int nused;
742 :
743 : Assert(buffer != NULL);
744 : Assert(buffer->nused < MAX_BUFFERED_TUPLES);
745 :
746 719285 : nused = buffer->nused;
747 :
748 719285 : if (buffer->slots[nused] == NULL)
749 150239 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
750 719285 : return buffer->slots[nused];
751 : }
752 :
753 : /*
754 : * Record the previously reserved TupleTableSlot that was reserved by
755 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
756 : */
757 : static inline void
758 718227 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
759 : TupleTableSlot *slot, int tuplen, uint64 lineno)
760 : {
761 718227 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
762 :
763 : Assert(buffer != NULL);
764 : Assert(slot == buffer->slots[buffer->nused]);
765 :
766 : /* Store the line number so we can properly report any errors later */
767 718227 : buffer->linenos[buffer->nused] = lineno;
768 :
769 : /* Record this slot as being used */
770 718227 : buffer->nused++;
771 :
772 : /* Update how many tuples are stored and their size */
773 718227 : miinfo->bufferedTuples++;
774 718227 : miinfo->bufferedBytes += tuplen;
775 718227 : }
776 :
777 : /*
778 : * Copy FROM file to relation.
779 : */
780 : uint64
781 1151 : CopyFrom(CopyFromState cstate)
782 : {
783 : ResultRelInfo *resultRelInfo;
784 : ResultRelInfo *target_resultRelInfo;
785 1151 : ResultRelInfo *prevResultRelInfo = NULL;
786 1151 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
787 : ModifyTableState *mtstate;
788 : ExprContext *econtext;
789 1151 : TupleTableSlot *singleslot = NULL;
790 1151 : MemoryContext oldcontext = CurrentMemoryContext;
791 :
792 1151 : PartitionTupleRouting *proute = NULL;
793 : ErrorContextCallback errcallback;
794 1151 : CommandId mycid = GetCurrentCommandId(true);
795 1151 : uint32 ti_options = 0; /* start with default options for insert */
796 1151 : BulkInsertState bistate = NULL;
797 : CopyInsertMethod insertMethod;
798 1151 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
799 1151 : int64 processed = 0;
800 1151 : int64 excluded = 0;
801 : bool has_before_insert_row_trig;
802 : bool has_instead_insert_row_trig;
803 1151 : bool leafpart_use_multi_insert = false;
804 :
805 : Assert(cstate->rel);
806 : Assert(list_length(cstate->range_table) == 1);
807 :
808 1151 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
809 : Assert(cstate->escontext);
810 :
811 : /*
812 : * The target must be a plain, foreign, or partitioned relation, or have
813 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
814 : * allowed on views, so we only hint about them in the view case.)
815 : */
816 1151 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
817 102 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
818 79 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
819 12 : !(cstate->rel->trigdesc &&
820 8 : cstate->rel->trigdesc->trig_insert_instead_row))
821 : {
822 4 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
823 4 : ereport(ERROR,
824 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
825 : errmsg("cannot copy to view \"%s\"",
826 : RelationGetRelationName(cstate->rel)),
827 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
828 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
829 0 : ereport(ERROR,
830 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
831 : errmsg("cannot copy to materialized view \"%s\"",
832 : RelationGetRelationName(cstate->rel))));
833 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
834 0 : ereport(ERROR,
835 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
836 : errmsg("cannot copy to sequence \"%s\"",
837 : RelationGetRelationName(cstate->rel))));
838 : else
839 0 : ereport(ERROR,
840 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
841 : errmsg("cannot copy to non-table relation \"%s\"",
842 : RelationGetRelationName(cstate->rel))));
843 : }
844 :
845 : /*
846 : * If the target file is new-in-transaction, we assume that checking FSM
847 : * for free space is a waste of time. This could possibly be wrong, but
848 : * it's unlikely.
849 : */
850 1147 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
851 1049 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
852 1038 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
853 56 : ti_options |= TABLE_INSERT_SKIP_FSM;
854 :
855 : /*
856 : * Optimize if new relation storage was created in this subxact or one of
857 : * its committed children and we won't see those rows later as part of an
858 : * earlier scan or command. The subxact test ensures that if this subxact
859 : * aborts then the frozen rows won't be visible after xact cleanup. Note
860 : * that the stronger test of exactly which subtransaction created it is
861 : * crucial for correctness of this optimization. The test for an earlier
862 : * scan or command tolerates false negatives. FREEZE causes other sessions
863 : * to see rows they would not see under MVCC, and a false negative merely
864 : * spreads that anomaly to the current session.
865 : */
866 1147 : if (cstate->opts.freeze)
867 : {
868 : /*
869 : * We currently disallow COPY FREEZE on partitioned tables. The
870 : * reason for this is that we've simply not yet opened the partitions
871 : * to determine if the optimization can be applied to them. We could
872 : * go and open them all here, but doing so may be quite a costly
873 : * overhead for small copies. In any case, we may just end up routing
874 : * tuples to a small number of partitions. It seems better just to
875 : * raise an ERROR for partitioned tables.
876 : */
877 45 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
878 : {
879 4 : ereport(ERROR,
880 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
881 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
882 : }
883 :
884 : /* There's currently no support for COPY FREEZE on foreign tables. */
885 41 : if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
886 4 : ereport(ERROR,
887 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
888 : errmsg("cannot perform COPY FREEZE on a foreign table")));
889 :
890 : /*
891 : * Tolerate one registration for the benefit of FirstXactSnapshot.
892 : * Scan-bearing queries generally create at least two registrations,
893 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
894 : * Clear CatalogSnapshot to avoid counting its registration. We'll
895 : * still detect ongoing catalog scans, each of which separately
896 : * registers the snapshot it uses.
897 : */
898 37 : InvalidateCatalogSnapshot();
899 37 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
900 0 : ereport(ERROR,
901 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
902 : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
903 :
904 74 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
905 37 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
906 12 : ereport(ERROR,
907 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
908 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
909 :
910 25 : ti_options |= TABLE_INSERT_FROZEN;
911 : }
912 :
913 : /*
914 : * We need a ResultRelInfo so we can use the regular executor's
915 : * index-entry-making machinery. (There used to be a huge amount of code
916 : * here that basically duplicated execUtils.c ...)
917 : */
918 1127 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
919 : bms_make_singleton(1));
920 1127 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
921 1127 : ExecInitResultRelation(estate, resultRelInfo, 1);
922 :
923 : /* Verify the named relation is a valid target for INSERT */
924 1127 : CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
925 :
926 1126 : ExecOpenIndices(resultRelInfo, false);
927 :
928 : /*
929 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
930 : * foreign-table result relation(s).
931 : */
932 1126 : mtstate = makeNode(ModifyTableState);
933 1126 : mtstate->ps.plan = NULL;
934 1126 : mtstate->ps.state = estate;
935 1126 : mtstate->operation = CMD_INSERT;
936 1126 : mtstate->mt_nrels = 1;
937 1126 : mtstate->resultRelInfo = resultRelInfo;
938 1126 : mtstate->rootResultRelInfo = resultRelInfo;
939 :
940 1126 : if (resultRelInfo->ri_FdwRoutine != NULL &&
941 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
942 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
943 : resultRelInfo);
944 :
945 : /*
946 : * Also, if the named relation is a foreign table, determine if the FDW
947 : * supports batch insert and determine the batch size (a FDW may support
948 : * batching, but it may be disabled for the server/table).
949 : *
950 : * If the FDW does not support batching, we set the batch size to 1.
951 : */
952 1126 : if (resultRelInfo->ri_FdwRoutine != NULL &&
953 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
954 18 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
955 18 : resultRelInfo->ri_BatchSize =
956 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
957 : else
958 1108 : resultRelInfo->ri_BatchSize = 1;
959 :
960 : Assert(resultRelInfo->ri_BatchSize >= 1);
961 :
962 : /* Prepare to catch AFTER triggers. */
963 1126 : AfterTriggerBeginQuery();
964 :
965 : /*
966 : * If there are any triggers with transition tables on the named relation,
967 : * we need to be prepared to capture transition tuples.
968 : *
969 : * Because partition tuple routing would like to know about whether
970 : * transition capture is active, we also set it in mtstate, which is
971 : * passed to ExecFindPartition() below.
972 : */
973 1126 : cstate->transition_capture = mtstate->mt_transition_capture =
974 1126 : MakeTransitionCaptureState(cstate->rel->trigdesc,
975 1126 : RelationGetRelid(cstate->rel),
976 : CMD_INSERT);
977 :
978 : /*
979 : * If the named relation is a partitioned table, initialize state for
980 : * CopyFrom tuple routing.
981 : */
982 1126 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
983 63 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
984 :
985 1126 : if (cstate->whereClause)
986 12 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
987 : &mtstate->ps);
988 :
989 : /*
990 : * It's generally more efficient to prepare a bunch of tuples for
991 : * insertion, and insert them in one
992 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
993 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
994 : * However, there are a number of reasons why we might not be able to do
995 : * this. These are explained below.
996 : */
997 1126 : if (resultRelInfo->ri_TrigDesc != NULL &&
998 117 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
999 57 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
1000 : {
1001 : /*
1002 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1003 : * triggers on the table. Such triggers might query the table we're
1004 : * inserting into and act differently if the tuples that have already
1005 : * been processed and prepared for insertion are not there.
1006 : */
1007 68 : insertMethod = CIM_SINGLE;
1008 : }
1009 1058 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
1010 14 : resultRelInfo->ri_BatchSize == 1)
1011 : {
1012 : /*
1013 : * Can't support multi-inserts to a foreign table if the FDW does not
1014 : * support batching, or it's disabled for the server or foreign table.
1015 : */
1016 9 : insertMethod = CIM_SINGLE;
1017 : }
1018 1049 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1019 19 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1020 : {
1021 : /*
1022 : * For partitioned tables we can't support multi-inserts when there
1023 : * are any statement level insert triggers. It might be possible to
1024 : * allow partitioned tables with such triggers in the future, but for
1025 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
1026 : * statement level insert triggers are on the same relation.
1027 : */
1028 14 : insertMethod = CIM_SINGLE;
1029 : }
1030 1035 : else if (cstate->volatile_defexprs)
1031 : {
1032 : /*
1033 : * Can't support multi-inserts if there are any volatile default
1034 : * expressions in the table. Similarly to the trigger case above,
1035 : * such expressions may query the table we're inserting into.
1036 : *
1037 : * Note: It does not matter if any partitions have any volatile
1038 : * default expressions as we use the defaults from the target of the
1039 : * COPY command.
1040 : */
1041 4 : insertMethod = CIM_SINGLE;
1042 : }
1043 1031 : else if (contain_volatile_functions(cstate->whereClause))
1044 : {
1045 : /*
1046 : * Can't support multi-inserts if there are any volatile function
1047 : * expressions in WHERE clause. Similarly to the trigger case above,
1048 : * such expressions may query the table we're inserting into.
1049 : *
1050 : * Note: the whereClause was already preprocessed in DoCopy(), so it's
1051 : * okay to use contain_volatile_functions() directly.
1052 : */
1053 0 : insertMethod = CIM_SINGLE;
1054 : }
1055 : else
1056 : {
1057 : /*
1058 : * For partitioned tables, we may still be able to perform bulk
1059 : * inserts. However, the possibility of this depends on which types
1060 : * of triggers exist on the partition. We must disable bulk inserts
1061 : * if the partition is a foreign table that can't use batching or it
1062 : * has any before row insert or insert instead triggers (same as we
1063 : * checked above for the parent table). Since the partition's
1064 : * resultRelInfos are initialized only when we actually need to insert
1065 : * the first tuple into them, we must have the intermediate insert
1066 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
1067 : * determine if we can use bulk-inserts for the partition being
1068 : * inserted into.
1069 : */
1070 1031 : if (proute)
1071 45 : insertMethod = CIM_MULTI_CONDITIONAL;
1072 : else
1073 986 : insertMethod = CIM_MULTI;
1074 :
1075 1031 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1076 : estate, mycid, ti_options);
1077 : }
1078 :
1079 : /*
1080 : * If not using batch mode (which allocates slots as needed) set up a
1081 : * tuple slot too. When inserting into a partitioned table, we also need
1082 : * one, even if we might batch insert, to read the tuple in the root
1083 : * partition's form.
1084 : */
1085 1126 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1086 : {
1087 140 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1088 : &estate->es_tupleTable);
1089 140 : bistate = GetBulkInsertState();
1090 : }
1091 :
1092 1243 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1093 117 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1094 :
1095 1243 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1096 117 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1097 :
1098 : /*
1099 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1100 : * should do this for COPY, since it's not really an "INSERT" statement as
1101 : * such. However, executing these triggers maintains consistency with the
1102 : * EACH ROW triggers that we already fire on COPY.
1103 : */
1104 1126 : ExecBSInsertTriggers(estate, resultRelInfo);
1105 :
1106 1126 : econtext = GetPerTupleExprContext(estate);
1107 :
1108 : /* Set up callback to identify error line number */
1109 1126 : errcallback.callback = CopyFromErrorCallback;
1110 1126 : errcallback.arg = cstate;
1111 1126 : errcallback.previous = error_context_stack;
1112 1126 : error_context_stack = &errcallback;
1113 :
1114 : for (;;)
1115 758557 : {
1116 : TupleTableSlot *myslot;
1117 : bool skip_tuple;
1118 :
1119 759683 : CHECK_FOR_INTERRUPTS();
1120 :
1121 : /*
1122 : * Reset the per-tuple exprcontext. We do this after every tuple, to
1123 : * clean-up after expression evaluations etc.
1124 : */
1125 759683 : ResetPerTupleExprContext(estate);
1126 :
1127 : /* select slot to (initially) load row into */
1128 759683 : if (insertMethod == CIM_SINGLE || proute)
1129 : {
1130 149557 : myslot = singleslot;
1131 149557 : Assert(myslot != NULL);
1132 : }
1133 : else
1134 : {
1135 : Assert(resultRelInfo == target_resultRelInfo);
1136 : Assert(insertMethod == CIM_MULTI);
1137 :
1138 610126 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1139 : resultRelInfo);
1140 : }
1141 :
1142 : /*
1143 : * Switch to per-tuple context before calling NextCopyFrom, which does
1144 : * evaluate default expressions etc. and requires per-tuple context.
1145 : */
1146 759683 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1147 :
1148 759683 : ExecClearTuple(myslot);
1149 :
1150 : /* Directly store the values/nulls array in the slot */
1151 759683 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1152 971 : break;
1153 :
1154 758585 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1155 108 : cstate->escontext->error_occurred)
1156 : {
1157 : /*
1158 : * Soft error occurred, skip this tuple and just make
1159 : * ErrorSaveContext ready for the next NextCopyFrom. Since we
1160 : * don't set details_wanted and error_data is not to be filled,
1161 : * just resetting error_occurred is enough.
1162 : */
1163 72 : cstate->escontext->error_occurred = false;
1164 :
1165 : /* Report that this tuple was skipped by the ON_ERROR clause */
1166 72 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1167 72 : cstate->num_errors);
1168 :
1169 72 : if (cstate->opts.reject_limit > 0 &&
1170 32 : cstate->num_errors > cstate->opts.reject_limit)
1171 4 : ereport(ERROR,
1172 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1173 : errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
1174 : cstate->opts.reject_limit)));
1175 :
1176 : /* Repeat NextCopyFrom() until no soft error occurs */
1177 68 : continue;
1178 : }
1179 :
1180 758513 : ExecStoreVirtualTuple(myslot);
1181 :
1182 : /*
1183 : * Constraints and where clause might reference the tableoid column,
1184 : * so (re-)initialize tts_tableOid before evaluating them.
1185 : */
1186 758513 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1187 :
1188 : /* Triggers and stuff need to be invoked in query context. */
1189 758513 : MemoryContextSwitchTo(oldcontext);
1190 :
1191 758513 : if (cstate->whereClause)
1192 : {
1193 44 : econtext->ecxt_scantuple = myslot;
1194 : /* Skip items that don't match COPY's WHERE clause */
1195 44 : if (!ExecQual(cstate->qualexpr, econtext))
1196 : {
1197 : /*
1198 : * Report that this tuple was filtered out by the WHERE
1199 : * clause.
1200 : */
1201 24 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1202 : ++excluded);
1203 24 : continue;
1204 : }
1205 : }
1206 :
1207 : /* Determine the partition to insert the tuple into */
1208 758489 : if (proute)
1209 : {
1210 : TupleConversionMap *map;
1211 :
1212 : /*
1213 : * Attempt to find a partition suitable for this tuple.
1214 : * ExecFindPartition() will raise an error if none can be found or
1215 : * if the found partition is not suitable for INSERTs.
1216 : */
1217 149252 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1218 : proute, myslot, estate);
1219 :
1220 149251 : if (prevResultRelInfo != resultRelInfo)
1221 : {
1222 : /* Determine which triggers exist on this partition */
1223 90815 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1224 36 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1225 :
1226 90815 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1227 36 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1228 :
1229 : /*
1230 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1231 : * OF triggers, or if the partition is a foreign table that
1232 : * can't use batching.
1233 : */
1234 141520 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1235 50741 : !has_before_insert_row_trig &&
1236 192245 : !has_instead_insert_row_trig &&
1237 50725 : (resultRelInfo->ri_FdwRoutine == NULL ||
1238 6 : resultRelInfo->ri_BatchSize > 1);
1239 :
1240 : /* Set the multi-insert buffer to use for this partition. */
1241 90779 : if (leafpart_use_multi_insert)
1242 : {
1243 50723 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1244 49 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1245 : resultRelInfo);
1246 : }
1247 40056 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1248 18 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1249 : {
1250 : /*
1251 : * Flush pending inserts if this partition can't use
1252 : * batching, so rows are visible to triggers etc.
1253 : */
1254 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1255 : resultRelInfo,
1256 : &processed);
1257 : }
1258 :
1259 90779 : if (bistate != NULL)
1260 90779 : ReleaseBulkInsertStatePin(bistate);
1261 90779 : prevResultRelInfo = resultRelInfo;
1262 : }
1263 :
1264 : /*
1265 : * If we're capturing transition tuples, we might need to convert
1266 : * from the partition rowtype to root rowtype. But if there are no
1267 : * BEFORE triggers on the partition that could change the tuple,
1268 : * we can just remember the original unconverted tuple to avoid a
1269 : * needless round trip conversion.
1270 : */
1271 149251 : if (cstate->transition_capture != NULL)
1272 38 : cstate->transition_capture->tcs_original_insert_tuple =
1273 38 : !has_before_insert_row_trig ? myslot : NULL;
1274 :
1275 : /*
1276 : * We might need to convert from the root rowtype to the partition
1277 : * rowtype.
1278 : */
1279 149251 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1280 149251 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1281 : {
1282 : /* non batch insert */
1283 40092 : if (map != NULL)
1284 : {
1285 : TupleTableSlot *new_slot;
1286 :
1287 73 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1288 73 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1289 : }
1290 : }
1291 : else
1292 : {
1293 : /*
1294 : * Prepare to queue up tuple for later batch insert into
1295 : * current partition.
1296 : */
1297 : TupleTableSlot *batchslot;
1298 :
1299 : /* no other path available for partitioned table */
1300 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1301 :
1302 109159 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1303 : resultRelInfo);
1304 :
1305 109159 : if (map != NULL)
1306 8132 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1307 : batchslot);
1308 : else
1309 : {
1310 : /*
1311 : * This looks more expensive than it is (Believe me, I
1312 : * optimized it away. Twice.). The input is in virtual
1313 : * form, and we'll materialize the slot below - for most
1314 : * slot types the copy performs the work materialization
1315 : * would later require anyway.
1316 : */
1317 101027 : ExecCopySlot(batchslot, myslot);
1318 101027 : myslot = batchslot;
1319 : }
1320 : }
1321 :
1322 : /* ensure that triggers etc see the right relation */
1323 149251 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1324 : }
1325 :
1326 758488 : skip_tuple = false;
1327 :
1328 : /* BEFORE ROW INSERT Triggers */
1329 758488 : if (has_before_insert_row_trig)
1330 : {
1331 180 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1332 10 : skip_tuple = true; /* "do nothing" */
1333 : }
1334 :
1335 758488 : if (!skip_tuple)
1336 : {
1337 : /*
1338 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1339 : * tuple. Otherwise, proceed with inserting the tuple into the
1340 : * table or foreign table.
1341 : */
1342 758478 : if (has_instead_insert_row_trig)
1343 : {
1344 8 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1345 : }
1346 : else
1347 : {
1348 : /* Compute stored generated columns */
1349 758470 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1350 241181 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1351 21 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1352 : CMD_INSERT);
1353 :
1354 : /*
1355 : * If the target is a plain table, check the constraints of
1356 : * the tuple.
1357 : */
1358 758470 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1359 758424 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1360 241163 : ExecConstraints(resultRelInfo, myslot, estate);
1361 :
1362 : /*
1363 : * Also check the tuple against the partition constraint, if
1364 : * there is one; except that if we got here via tuple-routing,
1365 : * we don't need to if there's no BR trigger defined on the
1366 : * partition.
1367 : */
1368 758450 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1369 149243 : (proute == NULL || has_before_insert_row_trig))
1370 1167 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1371 :
1372 : /* Store the slot in the multi-insert buffer, when enabled. */
1373 758450 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1374 : {
1375 : /*
1376 : * The slot previously might point into the per-tuple
1377 : * context. For batching it needs to be longer lived.
1378 : */
1379 718227 : ExecMaterializeSlot(myslot);
1380 :
1381 : /* Add this tuple to the tuple buffer */
1382 718227 : CopyMultiInsertInfoStore(&multiInsertInfo,
1383 : resultRelInfo, myslot,
1384 : cstate->line_buf.len,
1385 : cstate->cur_lineno);
1386 :
1387 : /*
1388 : * If enough inserts have queued up, then flush all
1389 : * buffers out to their tables.
1390 : */
1391 718227 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1392 656 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1393 : resultRelInfo,
1394 : &processed);
1395 :
1396 : /*
1397 : * We delay updating the row counter and progress of the
1398 : * COPY command until after writing the tuples stored in
1399 : * the buffer out to the table, as in single insert mode.
1400 : * See CopyMultiInsertBufferFlush().
1401 : */
1402 718227 : continue; /* next tuple please */
1403 : }
1404 : else
1405 : {
1406 40223 : List *recheckIndexes = NIL;
1407 :
1408 : /* OK, store the tuple */
1409 40223 : if (resultRelInfo->ri_FdwRoutine != NULL)
1410 : {
1411 27 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1412 : resultRelInfo,
1413 : myslot,
1414 : NULL);
1415 :
1416 26 : if (myslot == NULL) /* "do nothing" */
1417 2 : continue; /* next tuple please */
1418 :
1419 : /*
1420 : * AFTER ROW Triggers might reference the tableoid
1421 : * column, so (re-)initialize tts_tableOid before
1422 : * evaluating them.
1423 : */
1424 24 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1425 : }
1426 : else
1427 : {
1428 : /* OK, store the tuple and create index entries for it */
1429 40196 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1430 : myslot, mycid, ti_options, bistate);
1431 :
1432 40196 : if (resultRelInfo->ri_NumIndices > 0)
1433 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1434 : estate, 0,
1435 : myslot, NIL,
1436 : NULL);
1437 : }
1438 :
1439 : /* AFTER ROW INSERT Triggers */
1440 40220 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1441 : recheckIndexes, cstate->transition_capture);
1442 :
1443 40218 : list_free(recheckIndexes);
1444 : }
1445 : }
1446 :
1447 : /*
1448 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1449 : * or FDW; this is the same definition used by nodeModifyTable.c
1450 : * for counting tuples inserted by an INSERT command. Update
1451 : * progress of the COPY command as well.
1452 : */
1453 40226 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1454 : ++processed);
1455 : }
1456 : }
1457 :
1458 : /* Flush any remaining buffered tuples */
1459 971 : if (insertMethod != CIM_SINGLE)
1460 : {
1461 895 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1462 713 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1463 : }
1464 :
1465 : /* Done, clean up */
1466 957 : error_context_stack = errcallback.previous;
1467 :
1468 957 : if (cstate->num_errors > 0 &&
1469 24 : cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1470 : {
1471 20 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1472 16 : ereport(NOTICE,
1473 : errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
1474 : "%" PRIu64 " rows were skipped due to data type incompatibility",
1475 : cstate->num_errors,
1476 : cstate->num_errors));
1477 4 : else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1478 4 : ereport(NOTICE,
1479 : errmsg_plural("in %" PRIu64 " row, columns were set to null due to data type incompatibility",
1480 : "in %" PRIu64 " rows, columns were set to null due to data type incompatibility",
1481 : cstate->num_errors,
1482 : cstate->num_errors));
1483 : }
1484 :
1485 957 : if (bistate != NULL)
1486 120 : FreeBulkInsertState(bistate);
1487 :
1488 957 : MemoryContextSwitchTo(oldcontext);
1489 :
1490 : /* Execute AFTER STATEMENT insertion triggers */
1491 957 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1492 :
1493 : /* Handle queued AFTER triggers */
1494 957 : AfterTriggerEndQuery(estate);
1495 :
1496 957 : ExecResetTupleTable(estate->es_tupleTable, false);
1497 :
1498 : /* Allow the FDW to shut down */
1499 957 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1500 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1501 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1502 : target_resultRelInfo);
1503 :
1504 : /* Tear down the multi-insert buffer data */
1505 957 : if (insertMethod != CIM_SINGLE)
1506 881 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1507 :
1508 : /* Close all the partitioned tables, leaf partitions, and their indices */
1509 957 : if (proute)
1510 60 : ExecCleanupTupleRouting(mtstate, proute);
1511 :
1512 : /* Close the result relations, including any trigger target relations */
1513 957 : ExecCloseResultRelations(estate);
1514 957 : ExecCloseRangeTableRelations(estate);
1515 :
1516 957 : FreeExecutorState(estate);
1517 :
1518 957 : return processed;
1519 : }
1520 :
1521 : /*
1522 : * Setup to read tuples from a file for COPY FROM.
1523 : *
1524 : * 'rel': Used as a template for the tuples
1525 : * 'whereClause': WHERE clause from the COPY FROM command
1526 : * 'filename': Name of server-local file to read, NULL for STDIN
1527 : * 'is_program': true if 'filename' is program to execute
1528 : * 'data_source_cb': callback that provides the input data
1529 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1530 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1531 : *
1532 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1533 : */
1534 : CopyFromState
1535 1387 : BeginCopyFrom(ParseState *pstate,
1536 : Relation rel,
1537 : Node *whereClause,
1538 : const char *filename,
1539 : bool is_program,
1540 : copy_data_source_cb data_source_cb,
1541 : List *attnamelist,
1542 : List *options)
1543 : {
1544 : CopyFromState cstate;
1545 1387 : bool pipe = (filename == NULL);
1546 : TupleDesc tupDesc;
1547 : AttrNumber num_phys_attrs,
1548 : num_defaults;
1549 : FmgrInfo *in_functions;
1550 : Oid *typioparams;
1551 : int *defmap;
1552 : ExprState **defexprs;
1553 : MemoryContext oldcontext;
1554 : bool volatile_defexprs;
1555 1387 : const int progress_cols[] = {
1556 : PROGRESS_COPY_COMMAND,
1557 : PROGRESS_COPY_TYPE,
1558 : PROGRESS_COPY_BYTES_TOTAL
1559 : };
1560 1387 : int64 progress_vals[] = {
1561 : PROGRESS_COPY_COMMAND_FROM,
1562 : 0,
1563 : 0
1564 : };
1565 :
1566 : /* Allocate workspace and zero all fields */
1567 1387 : cstate = palloc0_object(CopyFromStateData);
1568 :
1569 : /*
1570 : * We allocate everything used by a cstate in a new memory context. This
1571 : * avoids memory leaks during repeated use of COPY in a query.
1572 : */
1573 1387 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1574 : "COPY",
1575 : ALLOCSET_DEFAULT_SIZES);
1576 :
1577 1387 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1578 :
1579 : /* Extract options from the statement node tree */
1580 1387 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1581 :
1582 : /* Set the format routine */
1583 1206 : cstate->routine = CopyFromGetRoutine(&cstate->opts);
1584 :
1585 : /* Process the target relation */
1586 1206 : cstate->rel = rel;
1587 :
1588 1206 : tupDesc = RelationGetDescr(cstate->rel);
1589 :
1590 : /* process common options or initialization */
1591 :
1592 : /* Generate or convert list of attributes to process */
1593 1206 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1594 :
1595 1206 : num_phys_attrs = tupDesc->natts;
1596 :
1597 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1598 1206 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1599 1206 : if (cstate->opts.force_notnull_all)
1600 8 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1601 1198 : else if (cstate->opts.force_notnull)
1602 : {
1603 : List *attnums;
1604 : ListCell *cur;
1605 :
1606 17 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1607 :
1608 34 : foreach(cur, attnums)
1609 : {
1610 21 : int attnum = lfirst_int(cur);
1611 21 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1612 :
1613 21 : if (!list_member_int(cstate->attnumlist, attnum))
1614 4 : ereport(ERROR,
1615 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1616 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1617 : errmsg("%s column \"%s\" not referenced by COPY",
1618 : "FORCE_NOT_NULL", NameStr(attr->attname))));
1619 17 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1620 : }
1621 : }
1622 :
1623 : /* Set up soft error handler for ON_ERROR */
1624 1202 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1625 : {
1626 70 : cstate->escontext = makeNode(ErrorSaveContext);
1627 70 : cstate->escontext->type = T_ErrorSaveContext;
1628 70 : cstate->escontext->error_occurred = false;
1629 :
1630 70 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
1631 29 : cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1632 70 : cstate->escontext->details_wanted = false;
1633 : }
1634 : else
1635 1132 : cstate->escontext = NULL;
1636 :
1637 1202 : if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1638 : {
1639 : /*
1640 : * When data type conversion fails and ON_ERROR is SET_NULL, we need
1641 : * ensure that the input column allow null values. ExecConstraints()
1642 : * will cover most of the cases, but it does not verify domain
1643 : * constraints. Therefore, for constrained domains, the null value
1644 : * check must be performed during the initial string-to-datum
1645 : * conversion (see CopyFromTextLikeOneRow()).
1646 : */
1647 29 : cstate->domain_with_constraint = palloc0_array(bool, num_phys_attrs);
1648 :
1649 140 : foreach_int(attno, cstate->attnumlist)
1650 : {
1651 82 : Form_pg_attribute att = TupleDescAttr(tupDesc, attno - 1);
1652 :
1653 82 : cstate->domain_with_constraint[attno - 1] = DomainHasConstraints(att->atttypid, NULL);
1654 : }
1655 : }
1656 :
1657 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1658 1202 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1659 1202 : if (cstate->opts.force_null_all)
1660 8 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1661 1194 : else if (cstate->opts.force_null)
1662 : {
1663 : List *attnums;
1664 : ListCell *cur;
1665 :
1666 17 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1667 :
1668 34 : foreach(cur, attnums)
1669 : {
1670 21 : int attnum = lfirst_int(cur);
1671 21 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1672 :
1673 21 : if (!list_member_int(cstate->attnumlist, attnum))
1674 4 : ereport(ERROR,
1675 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1676 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1677 : errmsg("%s column \"%s\" not referenced by COPY",
1678 : "FORCE_NULL", NameStr(attr->attname))));
1679 17 : cstate->opts.force_null_flags[attnum - 1] = true;
1680 : }
1681 : }
1682 :
1683 : /* Convert convert_selectively name list to per-column flags */
1684 1198 : if (cstate->opts.convert_selectively)
1685 : {
1686 : List *attnums;
1687 : ListCell *cur;
1688 :
1689 3 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1690 :
1691 3 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1692 :
1693 7 : foreach(cur, attnums)
1694 : {
1695 4 : int attnum = lfirst_int(cur);
1696 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1697 :
1698 4 : if (!list_member_int(cstate->attnumlist, attnum))
1699 0 : ereport(ERROR,
1700 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1701 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1702 : NameStr(attr->attname))));
1703 4 : cstate->convert_select_flags[attnum - 1] = true;
1704 : }
1705 : }
1706 :
1707 : /* Use client encoding when ENCODING option is not specified. */
1708 1198 : if (cstate->opts.file_encoding < 0)
1709 1178 : cstate->file_encoding = pg_get_client_encoding();
1710 : else
1711 20 : cstate->file_encoding = cstate->opts.file_encoding;
1712 :
1713 : /*
1714 : * Look up encoding conversion function.
1715 : */
1716 1198 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1717 52 : cstate->file_encoding == PG_SQL_ASCII ||
1718 24 : GetDatabaseEncoding() == PG_SQL_ASCII)
1719 : {
1720 1174 : cstate->need_transcoding = false;
1721 : }
1722 : else
1723 : {
1724 24 : cstate->need_transcoding = true;
1725 24 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1726 : GetDatabaseEncoding());
1727 24 : if (!OidIsValid(cstate->conversion_proc))
1728 0 : ereport(ERROR,
1729 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1730 : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1731 : pg_encoding_to_char(cstate->file_encoding),
1732 : pg_encoding_to_char(GetDatabaseEncoding()))));
1733 : }
1734 :
1735 1198 : cstate->copy_src = COPY_FILE; /* default */
1736 :
1737 1198 : cstate->whereClause = whereClause;
1738 :
1739 : /* Initialize state variables */
1740 1198 : cstate->eol_type = EOL_UNKNOWN;
1741 1198 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1742 1198 : cstate->cur_lineno = 0;
1743 1198 : cstate->cur_attname = NULL;
1744 1198 : cstate->cur_attval = NULL;
1745 1198 : cstate->relname_only = false;
1746 1198 : cstate->simd_enabled = true;
1747 :
1748 : /*
1749 : * Allocate buffers for the input pipeline.
1750 : *
1751 : * attribute_buf and raw_buf are used in both text and binary modes, but
1752 : * input_buf and line_buf only in text mode.
1753 : */
1754 1198 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1755 1198 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1756 1198 : cstate->raw_reached_eof = false;
1757 :
1758 1198 : initStringInfo(&cstate->attribute_buf);
1759 :
1760 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1761 1198 : if (pstate)
1762 : {
1763 1160 : cstate->range_table = pstate->p_rtable;
1764 1160 : cstate->rteperminfos = pstate->p_rteperminfos;
1765 : }
1766 :
1767 1198 : num_defaults = 0;
1768 1198 : volatile_defexprs = false;
1769 :
1770 : /*
1771 : * Pick up the required catalog information for each attribute in the
1772 : * relation, including the input function, the element type (to pass to
1773 : * the input function), and info about defaults and constraints. (Which
1774 : * input function we use depends on text/binary format choice.)
1775 : */
1776 1198 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1777 1198 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1778 1198 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1779 1198 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1780 :
1781 4763 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1782 : {
1783 3574 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1784 :
1785 : /* We don't need info for dropped attributes */
1786 3574 : if (att->attisdropped)
1787 82 : continue;
1788 :
1789 : /* Fetch the input function and typioparam info */
1790 3492 : cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1791 3492 : &in_functions[attnum - 1],
1792 3492 : &typioparams[attnum - 1]);
1793 :
1794 : /* Get default info if available */
1795 3491 : defexprs[attnum - 1] = NULL;
1796 :
1797 : /*
1798 : * We only need the default values for columns that do not appear in
1799 : * the column list, unless the DEFAULT option was given. We never need
1800 : * default values for generated columns.
1801 : */
1802 3491 : if ((cstate->opts.default_print != NULL ||
1803 3409 : !list_member_int(cstate->attnumlist, attnum)) &&
1804 425 : !att->attgenerated)
1805 : {
1806 403 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1807 : attnum);
1808 :
1809 403 : if (defexpr != NULL)
1810 : {
1811 : /* Run the expression through planner */
1812 167 : defexpr = expression_planner(defexpr);
1813 :
1814 : /* Initialize executable expression in copycontext */
1815 159 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1816 :
1817 : /* if NOT copied from input */
1818 : /* use default value if one exists */
1819 159 : if (!list_member_int(cstate->attnumlist, attnum))
1820 : {
1821 107 : defmap[num_defaults] = attnum - 1;
1822 107 : num_defaults++;
1823 : }
1824 :
1825 : /*
1826 : * If a default expression looks at the table being loaded,
1827 : * then it could give the wrong answer when using
1828 : * multi-insert. Since database access can be dynamic this is
1829 : * hard to test for exactly, so we use the much wider test of
1830 : * whether the default expression is volatile. We allow for
1831 : * the special case of when the default expression is the
1832 : * nextval() of a sequence which in this specific case is
1833 : * known to be safe for use with the multi-insert
1834 : * optimization. Hence we use this special case function
1835 : * checker rather than the standard check for
1836 : * contain_volatile_functions(). Note also that we already
1837 : * ran the expression through expression_planner().
1838 : */
1839 159 : if (!volatile_defexprs)
1840 159 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1841 : }
1842 : }
1843 : }
1844 :
1845 1189 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1846 :
1847 : /* initialize progress */
1848 1189 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1849 1189 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1850 1189 : cstate->bytes_processed = 0;
1851 :
1852 : /* We keep those variables in cstate. */
1853 1189 : cstate->in_functions = in_functions;
1854 1189 : cstate->typioparams = typioparams;
1855 1189 : cstate->defmap = defmap;
1856 1189 : cstate->defexprs = defexprs;
1857 1189 : cstate->volatile_defexprs = volatile_defexprs;
1858 1189 : cstate->num_defaults = num_defaults;
1859 1189 : cstate->is_program = is_program;
1860 :
1861 1189 : if (data_source_cb)
1862 : {
1863 214 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1864 214 : cstate->copy_src = COPY_CALLBACK;
1865 214 : cstate->data_source_cb = data_source_cb;
1866 : }
1867 975 : else if (pipe)
1868 : {
1869 685 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1870 : Assert(!is_program); /* the grammar does not allow this */
1871 685 : if (whereToSendOutput == DestRemote)
1872 685 : ReceiveCopyBegin(cstate);
1873 : else
1874 0 : cstate->copy_file = stdin;
1875 : }
1876 : else
1877 : {
1878 290 : cstate->filename = pstrdup(filename);
1879 :
1880 290 : if (cstate->is_program)
1881 : {
1882 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1883 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1884 0 : if (cstate->copy_file == NULL)
1885 0 : ereport(ERROR,
1886 : (errcode_for_file_access(),
1887 : errmsg("could not execute command \"%s\": %m",
1888 : cstate->filename)));
1889 : }
1890 : else
1891 : {
1892 : struct stat st;
1893 :
1894 290 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1895 290 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1896 290 : if (cstate->copy_file == NULL)
1897 : {
1898 : /* copy errno because ereport subfunctions might change it */
1899 0 : int save_errno = errno;
1900 :
1901 0 : ereport(ERROR,
1902 : (errcode_for_file_access(),
1903 : errmsg("could not open file \"%s\" for reading: %m",
1904 : cstate->filename),
1905 : (save_errno == ENOENT || save_errno == EACCES) ?
1906 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1907 : "You may want a client-side facility such as psql's \\copy.") : 0));
1908 : }
1909 :
1910 290 : if (fstat(fileno(cstate->copy_file), &st))
1911 0 : ereport(ERROR,
1912 : (errcode_for_file_access(),
1913 : errmsg("could not stat file \"%s\": %m",
1914 : cstate->filename)));
1915 :
1916 290 : if (S_ISDIR(st.st_mode))
1917 0 : ereport(ERROR,
1918 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1919 : errmsg("\"%s\" is a directory", cstate->filename)));
1920 :
1921 290 : progress_vals[2] = st.st_size;
1922 : }
1923 : }
1924 :
1925 1189 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1926 :
1927 1189 : cstate->routine->CopyFromStart(cstate, tupDesc);
1928 :
1929 1189 : MemoryContextSwitchTo(oldcontext);
1930 :
1931 1189 : return cstate;
1932 : }
1933 :
1934 : /*
1935 : * Clean up storage and release resources for COPY FROM.
1936 : */
1937 : void
1938 992 : EndCopyFrom(CopyFromState cstate)
1939 : {
1940 : /* Invoke the end callback */
1941 992 : cstate->routine->CopyFromEnd(cstate);
1942 :
1943 : /* No COPY FROM related resources except memory. */
1944 992 : if (cstate->is_program)
1945 : {
1946 0 : ClosePipeFromProgram(cstate);
1947 : }
1948 : else
1949 : {
1950 992 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1951 0 : ereport(ERROR,
1952 : (errcode_for_file_access(),
1953 : errmsg("could not close file \"%s\": %m",
1954 : cstate->filename)));
1955 : }
1956 :
1957 992 : pgstat_progress_end_command();
1958 :
1959 992 : MemoryContextDelete(cstate->copycontext);
1960 992 : pfree(cstate);
1961 992 : }
1962 :
1963 : /*
1964 : * Closes the pipe from an external program, checking the pclose() return code.
1965 : */
1966 : static void
1967 0 : ClosePipeFromProgram(CopyFromState cstate)
1968 : {
1969 : int pclose_rc;
1970 :
1971 : Assert(cstate->is_program);
1972 :
1973 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1974 0 : if (pclose_rc == -1)
1975 0 : ereport(ERROR,
1976 : (errcode_for_file_access(),
1977 : errmsg("could not close pipe to external command: %m")));
1978 0 : else if (pclose_rc != 0)
1979 : {
1980 : /*
1981 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1982 : * expectable for the called program to fail with SIGPIPE, and we
1983 : * should not report that as an error. Otherwise, SIGPIPE indicates a
1984 : * problem.
1985 : */
1986 0 : if (!cstate->raw_reached_eof &&
1987 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1988 0 : return;
1989 :
1990 0 : ereport(ERROR,
1991 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1992 : errmsg("program \"%s\" failed",
1993 : cstate->filename),
1994 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1995 : }
1996 : }
|