Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/tableam.h"
29 : #include "access/xact.h"
30 : #include "catalog/namespace.h"
31 : #include "commands/copyapi.h"
32 : #include "commands/copyfrom_internal.h"
33 : #include "commands/progress.h"
34 : #include "commands/trigger.h"
35 : #include "executor/execPartition.h"
36 : #include "executor/executor.h"
37 : #include "executor/nodeModifyTable.h"
38 : #include "executor/tuptable.h"
39 : #include "foreign/fdwapi.h"
40 : #include "mb/pg_wchar.h"
41 : #include "miscadmin.h"
42 : #include "nodes/miscnodes.h"
43 : #include "optimizer/optimizer.h"
44 : #include "pgstat.h"
45 : #include "rewrite/rewriteHandler.h"
46 : #include "storage/fd.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/lsyscache.h"
49 : #include "utils/memutils.h"
50 : #include "utils/portal.h"
51 : #include "utils/rel.h"
52 : #include "utils/snapmgr.h"
53 : #include "utils/typcache.h"
54 :
55 : /*
56 : * No more than this many tuples per CopyMultiInsertBuffer
57 : *
58 : * Caution: Don't make this too big, as we could end up with this many
59 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
60 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
61 : * memory requirements during copies into partitioned tables with a large
62 : * number of partitions.
63 : */
64 : #define MAX_BUFFERED_TUPLES 1000
65 :
66 : /*
67 : * Flush buffers if there are >= this many bytes, as counted by the input
68 : * size, of tuples stored.
69 : */
70 : #define MAX_BUFFERED_BYTES 65535
71 :
72 : /*
73 : * Trim the list of buffers back down to this number after flushing. This
74 : * must be >= 2.
75 : */
76 : #define MAX_PARTITION_BUFFERS 32
77 :
78 : /* Stores multi-insert data related to a single relation in CopyFrom. */
79 : typedef struct CopyMultiInsertBuffer
80 : {
81 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
82 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
83 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
84 : * table; NULL if foreign table */
85 : int nused; /* number of 'slots' containing tuples */
86 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
87 : * stream */
88 : } CopyMultiInsertBuffer;
89 :
90 : /*
91 : * Stores one or many CopyMultiInsertBuffers and details about the size and
92 : * number of tuples which are stored in them. This allows multiple buffers to
93 : * exist at once when COPYing into a partitioned table.
94 : */
95 : typedef struct CopyMultiInsertInfo
96 : {
97 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
98 : int bufferedTuples; /* number of tuples buffered over all buffers */
99 : int bufferedBytes; /* number of bytes from all buffered tuples */
100 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
101 : EState *estate; /* Executor state used for COPY */
102 : CommandId mycid; /* Command Id used for COPY */
103 : int ti_options; /* table insert options */
104 : } CopyMultiInsertInfo;
105 :
106 :
107 : /* non-export function prototypes */
108 : static void ClosePipeFromProgram(CopyFromState cstate);
109 :
110 : /*
111 : * Built-in format-specific routines. One-row callbacks are defined in
112 : * copyfromparse.c.
113 : */
114 : static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
115 : Oid *typioparam);
116 : static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
117 : static void CopyFromTextLikeEnd(CopyFromState cstate);
118 : static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
119 : FmgrInfo *finfo, Oid *typioparam);
120 : static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
121 : static void CopyFromBinaryEnd(CopyFromState cstate);
122 :
123 :
124 : /*
125 : * COPY FROM routines for built-in formats.
126 : *
127 : * CSV and text formats share the same TextLike routines except for the
128 : * one-row callback.
129 : */
130 :
131 : /* text format */
132 : static const CopyFromRoutine CopyFromRoutineText = {
133 : .CopyFromInFunc = CopyFromTextLikeInFunc,
134 : .CopyFromStart = CopyFromTextLikeStart,
135 : .CopyFromOneRow = CopyFromTextOneRow,
136 : .CopyFromEnd = CopyFromTextLikeEnd,
137 : };
138 :
139 : /* CSV format */
140 : static const CopyFromRoutine CopyFromRoutineCSV = {
141 : .CopyFromInFunc = CopyFromTextLikeInFunc,
142 : .CopyFromStart = CopyFromTextLikeStart,
143 : .CopyFromOneRow = CopyFromCSVOneRow,
144 : .CopyFromEnd = CopyFromTextLikeEnd,
145 : };
146 :
147 : /* binary format */
148 : static const CopyFromRoutine CopyFromRoutineBinary = {
149 : .CopyFromInFunc = CopyFromBinaryInFunc,
150 : .CopyFromStart = CopyFromBinaryStart,
151 : .CopyFromOneRow = CopyFromBinaryOneRow,
152 : .CopyFromEnd = CopyFromBinaryEnd,
153 : };
154 :
155 : /* Return a COPY FROM routine for the given options */
156 : static const CopyFromRoutine *
157 996 : CopyFromGetRoutine(const CopyFormatOptions *opts)
158 : {
159 996 : if (opts->csv_mode)
160 149 : return &CopyFromRoutineCSV;
161 847 : else if (opts->binary)
162 8 : return &CopyFromRoutineBinary;
163 :
164 : /* default is text */
165 839 : return &CopyFromRoutineText;
166 : }
167 :
168 : /* Implementation of the start callback for text and CSV formats */
169 : static void
170 976 : CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
171 : {
172 : AttrNumber attr_count;
173 :
174 : /*
175 : * If encoding conversion is needed, we need another buffer to hold the
176 : * converted input data. Otherwise, we can just point input_buf to the
177 : * same buffer as raw_buf.
178 : */
179 976 : if (cstate->need_transcoding)
180 : {
181 18 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
182 18 : cstate->input_buf_index = cstate->input_buf_len = 0;
183 : }
184 : else
185 958 : cstate->input_buf = cstate->raw_buf;
186 976 : cstate->input_reached_eof = false;
187 :
188 976 : initStringInfo(&cstate->line_buf);
189 :
190 : /*
191 : * Create workspace for CopyReadAttributes results; used by CSV and text
192 : * format.
193 : */
194 976 : attr_count = list_length(cstate->attnumlist);
195 976 : cstate->max_fields = attr_count;
196 976 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
197 976 : }
198 :
199 : /*
200 : * Implementation of the infunc callback for text and CSV formats. Assign
201 : * the input function data to the given *finfo.
202 : */
203 : static void
204 2828 : CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
205 : Oid *typioparam)
206 : {
207 : Oid func_oid;
208 :
209 2828 : getTypeInputInfo(atttypid, &func_oid, typioparam);
210 2828 : fmgr_info(func_oid, finfo);
211 2828 : }
212 :
213 : /* Implementation of the end callback for text and CSV formats */
214 : static void
215 635 : CopyFromTextLikeEnd(CopyFromState cstate)
216 : {
217 : /* nothing to do */
218 635 : }
219 :
220 : /* Implementation of the start callback for binary format */
221 : static void
222 7 : CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
223 : {
224 : /* Read and verify binary header */
225 7 : ReceiveCopyBinaryHeader(cstate);
226 7 : }
227 :
228 : /*
229 : * Implementation of the infunc callback for binary format. Assign
230 : * the binary input function to the given *finfo.
231 : */
232 : static void
233 31 : CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
234 : FmgrInfo *finfo, Oid *typioparam)
235 : {
236 : Oid func_oid;
237 :
238 31 : getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
239 30 : fmgr_info(func_oid, finfo);
240 30 : }
241 :
242 : /* Implementation of the end callback for binary format */
243 : static void
244 3 : CopyFromBinaryEnd(CopyFromState cstate)
245 : {
246 : /* nothing to do */
247 3 : }
248 :
249 : /*
250 : * error context callback for COPY FROM
251 : *
252 : * The argument for the error context must be CopyFromState.
253 : */
254 : void
255 193 : CopyFromErrorCallback(void *arg)
256 : {
257 193 : CopyFromState cstate = (CopyFromState) arg;
258 :
259 193 : if (cstate->relname_only)
260 : {
261 34 : errcontext("COPY %s",
262 : cstate->cur_relname);
263 34 : return;
264 : }
265 159 : if (cstate->opts.binary)
266 : {
267 : /* can't usefully display the data */
268 1 : if (cstate->cur_attname)
269 1 : errcontext("COPY %s, line %" PRIu64 ", column %s",
270 : cstate->cur_relname,
271 : cstate->cur_lineno,
272 : cstate->cur_attname);
273 : else
274 0 : errcontext("COPY %s, line %" PRIu64,
275 : cstate->cur_relname,
276 : cstate->cur_lineno);
277 : }
278 : else
279 : {
280 158 : if (cstate->cur_attname && cstate->cur_attval)
281 26 : {
282 : /* error is relevant to a particular column */
283 : char *attval;
284 :
285 26 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
286 26 : errcontext("COPY %s, line %" PRIu64 ", column %s: \"%s\"",
287 : cstate->cur_relname,
288 : cstate->cur_lineno,
289 : cstate->cur_attname,
290 : attval);
291 26 : pfree(attval);
292 : }
293 132 : else if (cstate->cur_attname)
294 : {
295 : /* error is relevant to a particular column, value is NULL */
296 6 : errcontext("COPY %s, line %" PRIu64 ", column %s: null input",
297 : cstate->cur_relname,
298 : cstate->cur_lineno,
299 : cstate->cur_attname);
300 : }
301 : else
302 : {
303 : /*
304 : * Error is relevant to a particular line.
305 : *
306 : * If line_buf still contains the correct line, print it.
307 : */
308 126 : if (cstate->line_buf_valid)
309 : {
310 : char *lineval;
311 :
312 100 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
313 100 : errcontext("COPY %s, line %" PRIu64 ": \"%s\"",
314 : cstate->cur_relname,
315 : cstate->cur_lineno, lineval);
316 100 : pfree(lineval);
317 : }
318 : else
319 : {
320 26 : errcontext("COPY %s, line %" PRIu64,
321 : cstate->cur_relname,
322 : cstate->cur_lineno);
323 : }
324 : }
325 : }
326 : }
327 :
328 : /*
329 : * Make sure we don't print an unreasonable amount of COPY data in a message.
330 : *
331 : * Returns a pstrdup'd copy of the input.
332 : */
333 : char *
334 156 : CopyLimitPrintoutLength(const char *str)
335 : {
336 : #define MAX_COPY_DATA_DISPLAY 100
337 :
338 156 : int slen = strlen(str);
339 : int len;
340 : char *res;
341 :
342 : /* Fast path if definitely okay */
343 156 : if (slen <= MAX_COPY_DATA_DISPLAY)
344 156 : return pstrdup(str);
345 :
346 : /* Apply encoding-dependent truncation */
347 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
348 :
349 : /*
350 : * Truncate, and add "..." to show we truncated the input.
351 : */
352 0 : res = (char *) palloc(len + 4);
353 0 : memcpy(res, str, len);
354 0 : strcpy(res + len, "...");
355 :
356 0 : return res;
357 : }
358 :
359 : /*
360 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
361 : * ResultRelInfo.
362 : */
363 : static CopyMultiInsertBuffer *
364 855 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
365 : {
366 : CopyMultiInsertBuffer *buffer;
367 :
368 855 : buffer = palloc_object(CopyMultiInsertBuffer);
369 855 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
370 855 : buffer->resultRelInfo = rri;
371 855 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
372 855 : buffer->nused = 0;
373 :
374 855 : return buffer;
375 : }
376 :
377 : /*
378 : * Make a new buffer for this ResultRelInfo.
379 : */
380 : static inline void
381 855 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
382 : ResultRelInfo *rri)
383 : {
384 : CopyMultiInsertBuffer *buffer;
385 :
386 855 : buffer = CopyMultiInsertBufferInit(rri);
387 :
388 : /* Setup back-link so we can easily find this buffer again */
389 855 : rri->ri_CopyMultiInsertBuffer = buffer;
390 : /* Record that we're tracking this buffer */
391 855 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
392 855 : }
393 :
394 : /*
395 : * Initialize an already allocated CopyMultiInsertInfo.
396 : *
397 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
398 : * for that table.
399 : */
400 : static void
401 852 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
402 : CopyFromState cstate, EState *estate, CommandId mycid,
403 : int ti_options)
404 : {
405 852 : miinfo->multiInsertBuffers = NIL;
406 852 : miinfo->bufferedTuples = 0;
407 852 : miinfo->bufferedBytes = 0;
408 852 : miinfo->cstate = cstate;
409 852 : miinfo->estate = estate;
410 852 : miinfo->mycid = mycid;
411 852 : miinfo->ti_options = ti_options;
412 :
413 : /*
414 : * Only setup the buffer when not dealing with a partitioned table.
415 : * Buffers for partitioned tables will just be setup when we need to send
416 : * tuples their way for the first time.
417 : */
418 852 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
419 812 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
420 852 : }
421 :
422 : /*
423 : * Returns true if the buffers are full
424 : */
425 : static inline bool
426 600499 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
427 : {
428 600499 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
429 600004 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
430 548 : return true;
431 599951 : return false;
432 : }
433 :
434 : /*
435 : * Returns true if we have no buffered tuples
436 : */
437 : static inline bool
438 766 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
439 : {
440 766 : return miinfo->bufferedTuples == 0;
441 : }
442 :
443 : /*
444 : * Write the tuples stored in 'buffer' out to the table.
445 : */
446 : static inline void
447 1249 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
448 : CopyMultiInsertBuffer *buffer,
449 : int64 *processed)
450 : {
451 1249 : CopyFromState cstate = miinfo->cstate;
452 1249 : EState *estate = miinfo->estate;
453 1249 : int nused = buffer->nused;
454 1249 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
455 1249 : TupleTableSlot **slots = buffer->slots;
456 : int i;
457 :
458 1249 : if (resultRelInfo->ri_FdwRoutine)
459 : {
460 7 : int batch_size = resultRelInfo->ri_BatchSize;
461 7 : int sent = 0;
462 :
463 : Assert(buffer->bistate == NULL);
464 :
465 : /* Ensure that the FDW supports batching and it's enabled */
466 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
467 : Assert(batch_size > 1);
468 :
469 : /*
470 : * We suppress error context information other than the relation name,
471 : * if one of the operations below fails.
472 : */
473 : Assert(!cstate->relname_only);
474 7 : cstate->relname_only = true;
475 :
476 19 : while (sent < nused)
477 : {
478 13 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
479 13 : int inserted = size;
480 : TupleTableSlot **rslots;
481 :
482 : /* insert into foreign table: let the FDW do it */
483 : rslots =
484 13 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
485 : resultRelInfo,
486 13 : &slots[sent],
487 : NULL,
488 : &inserted);
489 :
490 12 : sent += size;
491 :
492 : /* No need to do anything if there are no inserted rows */
493 12 : if (inserted <= 0)
494 2 : continue;
495 :
496 : /* Triggers on foreign tables should not have transition tables */
497 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
498 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
499 :
500 : /* Run AFTER ROW INSERT triggers */
501 10 : if (resultRelInfo->ri_TrigDesc != NULL &&
502 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
503 : {
504 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
505 :
506 0 : for (i = 0; i < inserted; i++)
507 : {
508 0 : TupleTableSlot *slot = rslots[i];
509 :
510 : /*
511 : * AFTER ROW Triggers might reference the tableoid column,
512 : * so (re-)initialize tts_tableOid before evaluating them.
513 : */
514 0 : slot->tts_tableOid = relid;
515 :
516 0 : ExecARInsertTriggers(estate, resultRelInfo,
517 : slot, NIL,
518 : cstate->transition_capture);
519 : }
520 : }
521 :
522 : /* Update the row counter and progress of the COPY command */
523 10 : *processed += inserted;
524 10 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
525 : *processed);
526 : }
527 :
528 24 : for (i = 0; i < nused; i++)
529 18 : ExecClearTuple(slots[i]);
530 :
531 : /* reset relname_only */
532 6 : cstate->relname_only = false;
533 : }
534 : else
535 : {
536 1242 : CommandId mycid = miinfo->mycid;
537 1242 : int ti_options = miinfo->ti_options;
538 1242 : bool line_buf_valid = cstate->line_buf_valid;
539 1242 : uint64 save_cur_lineno = cstate->cur_lineno;
540 : MemoryContext oldcontext;
541 :
542 : Assert(buffer->bistate != NULL);
543 :
544 : /*
545 : * Print error context information correctly, if one of the operations
546 : * below fails.
547 : */
548 1242 : cstate->line_buf_valid = false;
549 :
550 : /*
551 : * table_multi_insert may leak memory, so switch to short-lived memory
552 : * context before calling it.
553 : */
554 1242 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
555 1242 : table_multi_insert(resultRelInfo->ri_RelationDesc,
556 : slots,
557 : nused,
558 : mycid,
559 : ti_options,
560 1242 : buffer->bistate);
561 1242 : MemoryContextSwitchTo(oldcontext);
562 :
563 601606 : for (i = 0; i < nused; i++)
564 : {
565 : /*
566 : * If there are any indexes, update them for all the inserted
567 : * tuples, and run AFTER ROW INSERT triggers.
568 : */
569 600376 : if (resultRelInfo->ri_NumIndices > 0)
570 : {
571 : List *recheckIndexes;
572 :
573 100959 : cstate->cur_lineno = buffer->linenos[i];
574 : recheckIndexes =
575 100959 : ExecInsertIndexTuples(resultRelInfo,
576 : estate, 0, buffer->slots[i],
577 : NIL, NULL);
578 100947 : ExecARInsertTriggers(estate, resultRelInfo,
579 100947 : slots[i], recheckIndexes,
580 : cstate->transition_capture);
581 100947 : list_free(recheckIndexes);
582 : }
583 :
584 : /*
585 : * There's no indexes, but see if we need to run AFTER ROW INSERT
586 : * triggers anyway.
587 : */
588 499417 : else if (resultRelInfo->ri_TrigDesc != NULL &&
589 42 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
590 33 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
591 : {
592 18 : cstate->cur_lineno = buffer->linenos[i];
593 18 : ExecARInsertTriggers(estate, resultRelInfo,
594 18 : slots[i], NIL,
595 : cstate->transition_capture);
596 : }
597 :
598 600364 : ExecClearTuple(slots[i]);
599 : }
600 :
601 : /* Update the row counter and progress of the COPY command */
602 1230 : *processed += nused;
603 1230 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
604 : *processed);
605 :
606 : /* reset cur_lineno and line_buf_valid to what they were */
607 1230 : cstate->line_buf_valid = line_buf_valid;
608 1230 : cstate->cur_lineno = save_cur_lineno;
609 : }
610 :
611 : /* Mark that all slots are free */
612 1236 : buffer->nused = 0;
613 1236 : }
614 :
615 : /*
616 : * Drop used slots and free member for this buffer.
617 : *
618 : * The buffer must be flushed before cleanup.
619 : */
620 : static inline void
621 742 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
622 : CopyMultiInsertBuffer *buffer)
623 : {
624 742 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
625 : int i;
626 :
627 : /* Ensure buffer was flushed */
628 : Assert(buffer->nused == 0);
629 :
630 : /* Remove back-link to ourself */
631 742 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
632 :
633 742 : if (resultRelInfo->ri_FdwRoutine == NULL)
634 : {
635 : Assert(buffer->bistate != NULL);
636 736 : FreeBulkInsertState(buffer->bistate);
637 : }
638 : else
639 : Assert(buffer->bistate == NULL);
640 :
641 : /* Since we only create slots on demand, just drop the non-null ones. */
642 123679 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
643 122937 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
644 :
645 742 : if (resultRelInfo->ri_FdwRoutine == NULL)
646 736 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
647 : miinfo->ti_options);
648 :
649 742 : pfree(buffer);
650 742 : }
651 :
652 : /*
653 : * Write out all stored tuples in all buffers out to the tables.
654 : *
655 : * Once flushed we also trim the tracked buffers list down to size by removing
656 : * the buffers created earliest first.
657 : *
658 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
659 : * used. When cleaning up old buffers we'll never remove the one for
660 : * 'curr_rri'.
661 : */
662 : static inline void
663 1136 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
664 : int64 *processed)
665 : {
666 : ListCell *lc;
667 :
668 2372 : foreach(lc, miinfo->multiInsertBuffers)
669 : {
670 1249 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
671 :
672 1249 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
673 : }
674 :
675 1123 : miinfo->bufferedTuples = 0;
676 1123 : miinfo->bufferedBytes = 0;
677 :
678 : /*
679 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
680 : * remove buffers starting with the ones we created first. It seems less
681 : * likely that these older ones will be needed than the ones that were
682 : * just created.
683 : */
684 1123 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
685 : {
686 : CopyMultiInsertBuffer *buffer;
687 :
688 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
689 :
690 : /*
691 : * We never want to remove the buffer that's currently being used, so
692 : * if we happen to find that then move it to the end of the list.
693 : */
694 0 : if (buffer->resultRelInfo == curr_rri)
695 : {
696 : /*
697 : * The code below would misbehave if we were trying to reduce the
698 : * list to less than two items.
699 : */
700 : StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
701 : "MAX_PARTITION_BUFFERS must be >= 2");
702 :
703 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
704 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
705 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
706 : }
707 :
708 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
709 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
710 : }
711 1123 : }
712 :
713 : /*
714 : * Cleanup allocated buffers and free memory
715 : */
716 : static inline void
717 739 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
718 : {
719 : ListCell *lc;
720 :
721 1481 : foreach(lc, miinfo->multiInsertBuffers)
722 742 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
723 :
724 739 : list_free(miinfo->multiInsertBuffers);
725 739 : }
726 :
727 : /*
728 : * Get the next TupleTableSlot that the next tuple should be stored in.
729 : *
730 : * Callers must ensure that the buffer is not full.
731 : *
732 : * Note: 'miinfo' is unused but has been included for consistency with the
733 : * other functions in this area.
734 : */
735 : static inline TupleTableSlot *
736 601365 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
737 : ResultRelInfo *rri)
738 : {
739 601365 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
740 : int nused;
741 :
742 : Assert(buffer != NULL);
743 : Assert(buffer->nused < MAX_BUFFERED_TUPLES);
744 :
745 601365 : nused = buffer->nused;
746 :
747 601365 : if (buffer->slots[nused] == NULL)
748 123166 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
749 601365 : return buffer->slots[nused];
750 : }
751 :
752 : /*
753 : * Record the previously reserved TupleTableSlot that was reserved by
754 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
755 : */
756 : static inline void
757 600499 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
758 : TupleTableSlot *slot, int tuplen, uint64 lineno)
759 : {
760 600499 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
761 :
762 : Assert(buffer != NULL);
763 : Assert(slot == buffer->slots[buffer->nused]);
764 :
765 : /* Store the line number so we can properly report any errors later */
766 600499 : buffer->linenos[buffer->nused] = lineno;
767 :
768 : /* Record this slot as being used */
769 600499 : buffer->nused++;
770 :
771 : /* Update how many tuples are stored and their size */
772 600499 : miinfo->bufferedTuples++;
773 600499 : miinfo->bufferedBytes += tuplen;
774 600499 : }
775 :
776 : /*
777 : * Copy FROM file to relation.
778 : */
779 : uint64
780 946 : CopyFrom(CopyFromState cstate)
781 : {
782 : ResultRelInfo *resultRelInfo;
783 : ResultRelInfo *target_resultRelInfo;
784 946 : ResultRelInfo *prevResultRelInfo = NULL;
785 946 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
786 : ModifyTableState *mtstate;
787 : ExprContext *econtext;
788 946 : TupleTableSlot *singleslot = NULL;
789 946 : MemoryContext oldcontext = CurrentMemoryContext;
790 :
791 946 : PartitionTupleRouting *proute = NULL;
792 : ErrorContextCallback errcallback;
793 946 : CommandId mycid = GetCurrentCommandId(true);
794 946 : int ti_options = 0; /* start with default options for insert */
795 946 : BulkInsertState bistate = NULL;
796 : CopyInsertMethod insertMethod;
797 946 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
798 946 : int64 processed = 0;
799 946 : int64 excluded = 0;
800 : bool has_before_insert_row_trig;
801 : bool has_instead_insert_row_trig;
802 946 : bool leafpart_use_multi_insert = false;
803 :
804 : Assert(cstate->rel);
805 : Assert(list_length(cstate->range_table) == 1);
806 :
807 946 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
808 : Assert(cstate->escontext);
809 :
810 : /*
811 : * The target must be a plain, foreign, or partitioned relation, or have
812 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
813 : * allowed on views, so we only hint about them in the view case.)
814 : */
815 946 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
816 88 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
817 66 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
818 9 : !(cstate->rel->trigdesc &&
819 6 : cstate->rel->trigdesc->trig_insert_instead_row))
820 : {
821 3 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
822 3 : ereport(ERROR,
823 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
824 : errmsg("cannot copy to view \"%s\"",
825 : RelationGetRelationName(cstate->rel)),
826 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
827 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
828 0 : ereport(ERROR,
829 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
830 : errmsg("cannot copy to materialized view \"%s\"",
831 : RelationGetRelationName(cstate->rel))));
832 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
833 0 : ereport(ERROR,
834 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
835 : errmsg("cannot copy to sequence \"%s\"",
836 : RelationGetRelationName(cstate->rel))));
837 : else
838 0 : ereport(ERROR,
839 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
840 : errmsg("cannot copy to non-table relation \"%s\"",
841 : RelationGetRelationName(cstate->rel))));
842 : }
843 :
844 : /*
845 : * If the target file is new-in-transaction, we assume that checking FSM
846 : * for free space is a waste of time. This could possibly be wrong, but
847 : * it's unlikely.
848 : */
849 943 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
850 858 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
851 851 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
852 45 : ti_options |= TABLE_INSERT_SKIP_FSM;
853 :
854 : /*
855 : * Optimize if new relation storage was created in this subxact or one of
856 : * its committed children and we won't see those rows later as part of an
857 : * earlier scan or command. The subxact test ensures that if this subxact
858 : * aborts then the frozen rows won't be visible after xact cleanup. Note
859 : * that the stronger test of exactly which subtransaction created it is
860 : * crucial for correctness of this optimization. The test for an earlier
861 : * scan or command tolerates false negatives. FREEZE causes other sessions
862 : * to see rows they would not see under MVCC, and a false negative merely
863 : * spreads that anomaly to the current session.
864 : */
865 943 : if (cstate->opts.freeze)
866 : {
867 : /*
868 : * We currently disallow COPY FREEZE on partitioned tables. The
869 : * reason for this is that we've simply not yet opened the partitions
870 : * to determine if the optimization can be applied to them. We could
871 : * go and open them all here, but doing so may be quite a costly
872 : * overhead for small copies. In any case, we may just end up routing
873 : * tuples to a small number of partitions. It seems better just to
874 : * raise an ERROR for partitioned tables.
875 : */
876 36 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
877 : {
878 3 : ereport(ERROR,
879 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
880 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
881 : }
882 :
883 : /* There's currently no support for COPY FREEZE on foreign tables. */
884 33 : if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
885 3 : ereport(ERROR,
886 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
887 : errmsg("cannot perform COPY FREEZE on a foreign table")));
888 :
889 : /*
890 : * Tolerate one registration for the benefit of FirstXactSnapshot.
891 : * Scan-bearing queries generally create at least two registrations,
892 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
893 : * Clear CatalogSnapshot to avoid counting its registration. We'll
894 : * still detect ongoing catalog scans, each of which separately
895 : * registers the snapshot it uses.
896 : */
897 30 : InvalidateCatalogSnapshot();
898 30 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
899 0 : ereport(ERROR,
900 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
901 : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
902 :
903 60 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
904 30 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
905 9 : ereport(ERROR,
906 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
907 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
908 :
909 21 : ti_options |= TABLE_INSERT_FROZEN;
910 : }
911 :
912 : /*
913 : * We need a ResultRelInfo so we can use the regular executor's
914 : * index-entry-making machinery. (There used to be a huge amount of code
915 : * here that basically duplicated execUtils.c ...)
916 : */
917 928 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
918 : bms_make_singleton(1));
919 928 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
920 928 : ExecInitResultRelation(estate, resultRelInfo, 1);
921 :
922 : /* Verify the named relation is a valid target for INSERT */
923 928 : CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
924 :
925 927 : ExecOpenIndices(resultRelInfo, false);
926 :
927 : /*
928 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
929 : * foreign-table result relation(s).
930 : */
931 927 : mtstate = makeNode(ModifyTableState);
932 927 : mtstate->ps.plan = NULL;
933 927 : mtstate->ps.state = estate;
934 927 : mtstate->operation = CMD_INSERT;
935 927 : mtstate->mt_nrels = 1;
936 927 : mtstate->resultRelInfo = resultRelInfo;
937 927 : mtstate->rootResultRelInfo = resultRelInfo;
938 :
939 927 : if (resultRelInfo->ri_FdwRoutine != NULL &&
940 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
941 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
942 : resultRelInfo);
943 :
944 : /*
945 : * Also, if the named relation is a foreign table, determine if the FDW
946 : * supports batch insert and determine the batch size (a FDW may support
947 : * batching, but it may be disabled for the server/table).
948 : *
949 : * If the FDW does not support batching, we set the batch size to 1.
950 : */
951 927 : if (resultRelInfo->ri_FdwRoutine != NULL &&
952 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
953 18 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
954 18 : resultRelInfo->ri_BatchSize =
955 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
956 : else
957 909 : resultRelInfo->ri_BatchSize = 1;
958 :
959 : Assert(resultRelInfo->ri_BatchSize >= 1);
960 :
961 : /* Prepare to catch AFTER triggers. */
962 927 : AfterTriggerBeginQuery();
963 :
964 : /*
965 : * If there are any triggers with transition tables on the named relation,
966 : * we need to be prepared to capture transition tuples.
967 : *
968 : * Because partition tuple routing would like to know about whether
969 : * transition capture is active, we also set it in mtstate, which is
970 : * passed to ExecFindPartition() below.
971 : */
972 927 : cstate->transition_capture = mtstate->mt_transition_capture =
973 927 : MakeTransitionCaptureState(cstate->rel->trigdesc,
974 927 : RelationGetRelid(cstate->rel),
975 : CMD_INSERT);
976 :
977 : /*
978 : * If the named relation is a partitioned table, initialize state for
979 : * CopyFrom tuple routing.
980 : */
981 927 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
982 54 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
983 :
984 927 : if (cstate->whereClause)
985 9 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
986 : &mtstate->ps);
987 :
988 : /*
989 : * It's generally more efficient to prepare a bunch of tuples for
990 : * insertion, and insert them in one
991 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
992 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
993 : * However, there are a number of reasons why we might not be able to do
994 : * this. These are explained below.
995 : */
996 927 : if (resultRelInfo->ri_TrigDesc != NULL &&
997 92 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
998 46 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
999 : {
1000 : /*
1001 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1002 : * triggers on the table. Such triggers might query the table we're
1003 : * inserting into and act differently if the tuples that have already
1004 : * been processed and prepared for insertion are not there.
1005 : */
1006 52 : insertMethod = CIM_SINGLE;
1007 : }
1008 875 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
1009 14 : resultRelInfo->ri_BatchSize == 1)
1010 : {
1011 : /*
1012 : * Can't support multi-inserts to a foreign table if the FDW does not
1013 : * support batching, or it's disabled for the server or foreign table.
1014 : */
1015 9 : insertMethod = CIM_SINGLE;
1016 : }
1017 866 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1018 15 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1019 : {
1020 : /*
1021 : * For partitioned tables we can't support multi-inserts when there
1022 : * are any statement level insert triggers. It might be possible to
1023 : * allow partitioned tables with such triggers in the future, but for
1024 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
1025 : * statement level insert triggers are on the same relation.
1026 : */
1027 11 : insertMethod = CIM_SINGLE;
1028 : }
1029 855 : else if (cstate->volatile_defexprs)
1030 : {
1031 : /*
1032 : * Can't support multi-inserts if there are any volatile default
1033 : * expressions in the table. Similarly to the trigger case above,
1034 : * such expressions may query the table we're inserting into.
1035 : *
1036 : * Note: It does not matter if any partitions have any volatile
1037 : * default expressions as we use the defaults from the target of the
1038 : * COPY command.
1039 : */
1040 3 : insertMethod = CIM_SINGLE;
1041 : }
1042 852 : else if (contain_volatile_functions(cstate->whereClause))
1043 : {
1044 : /*
1045 : * Can't support multi-inserts if there are any volatile function
1046 : * expressions in WHERE clause. Similarly to the trigger case above,
1047 : * such expressions may query the table we're inserting into.
1048 : *
1049 : * Note: the whereClause was already preprocessed in DoCopy(), so it's
1050 : * okay to use contain_volatile_functions() directly.
1051 : */
1052 0 : insertMethod = CIM_SINGLE;
1053 : }
1054 : else
1055 : {
1056 : /*
1057 : * For partitioned tables, we may still be able to perform bulk
1058 : * inserts. However, the possibility of this depends on which types
1059 : * of triggers exist on the partition. We must disable bulk inserts
1060 : * if the partition is a foreign table that can't use batching or it
1061 : * has any before row insert or insert instead triggers (same as we
1062 : * checked above for the parent table). Since the partition's
1063 : * resultRelInfos are initialized only when we actually need to insert
1064 : * the first tuple into them, we must have the intermediate insert
1065 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
1066 : * determine if we can use bulk-inserts for the partition being
1067 : * inserted into.
1068 : */
1069 852 : if (proute)
1070 40 : insertMethod = CIM_MULTI_CONDITIONAL;
1071 : else
1072 812 : insertMethod = CIM_MULTI;
1073 :
1074 852 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1075 : estate, mycid, ti_options);
1076 : }
1077 :
1078 : /*
1079 : * If not using batch mode (which allocates slots as needed) set up a
1080 : * tuple slot too. When inserting into a partitioned table, we also need
1081 : * one, even if we might batch insert, to read the tuple in the root
1082 : * partition's form.
1083 : */
1084 927 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1085 : {
1086 115 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1087 : &estate->es_tupleTable);
1088 115 : bistate = GetBulkInsertState();
1089 : }
1090 :
1091 1019 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1092 92 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1093 :
1094 1019 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1095 92 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1096 :
1097 : /*
1098 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1099 : * should do this for COPY, since it's not really an "INSERT" statement as
1100 : * such. However, executing these triggers maintains consistency with the
1101 : * EACH ROW triggers that we already fire on COPY.
1102 : */
1103 927 : ExecBSInsertTriggers(estate, resultRelInfo);
1104 :
1105 927 : econtext = GetPerTupleExprContext(estate);
1106 :
1107 : /* Set up callback to identify error line number */
1108 927 : errcallback.callback = CopyFromErrorCallback;
1109 927 : errcallback.arg = cstate;
1110 927 : errcallback.previous = error_context_stack;
1111 927 : error_context_stack = &errcallback;
1112 :
1113 : for (;;)
1114 630753 : {
1115 : TupleTableSlot *myslot;
1116 : bool skip_tuple;
1117 :
1118 631680 : CHECK_FOR_INTERRUPTS();
1119 :
1120 : /*
1121 : * Reset the per-tuple exprcontext. We do this after every tuple, to
1122 : * clean-up after expression evaluations etc.
1123 : */
1124 631680 : ResetPerTupleExprContext(estate);
1125 :
1126 : /* select slot to (initially) load row into */
1127 631680 : if (insertMethod == CIM_SINGLE || proute)
1128 : {
1129 137441 : myslot = singleslot;
1130 137441 : Assert(myslot != NULL);
1131 : }
1132 : else
1133 : {
1134 : Assert(resultRelInfo == target_resultRelInfo);
1135 : Assert(insertMethod == CIM_MULTI);
1136 :
1137 494239 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1138 : resultRelInfo);
1139 : }
1140 :
1141 : /*
1142 : * Switch to per-tuple context before calling NextCopyFrom, which does
1143 : * evaluate default expressions etc. and requires per-tuple context.
1144 : */
1145 631680 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1146 :
1147 631680 : ExecClearTuple(myslot);
1148 :
1149 : /* Directly store the values/nulls array in the slot */
1150 631680 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1151 812 : break;
1152 :
1153 630775 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1154 81 : cstate->escontext->error_occurred)
1155 : {
1156 : /*
1157 : * Soft error occurred, skip this tuple and just make
1158 : * ErrorSaveContext ready for the next NextCopyFrom. Since we
1159 : * don't set details_wanted and error_data is not to be filled,
1160 : * just resetting error_occurred is enough.
1161 : */
1162 54 : cstate->escontext->error_occurred = false;
1163 :
1164 : /* Report that this tuple was skipped by the ON_ERROR clause */
1165 54 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1166 54 : cstate->num_errors);
1167 :
1168 54 : if (cstate->opts.reject_limit > 0 &&
1169 24 : cstate->num_errors > cstate->opts.reject_limit)
1170 3 : ereport(ERROR,
1171 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1172 : errmsg("skipped more than REJECT_LIMIT (%" PRId64 ") rows due to data type incompatibility",
1173 : cstate->opts.reject_limit)));
1174 :
1175 : /* Repeat NextCopyFrom() until no soft error occurs */
1176 51 : continue;
1177 : }
1178 :
1179 630721 : ExecStoreVirtualTuple(myslot);
1180 :
1181 : /*
1182 : * Constraints and where clause might reference the tableoid column,
1183 : * so (re-)initialize tts_tableOid before evaluating them.
1184 : */
1185 630721 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1186 :
1187 : /* Triggers and stuff need to be invoked in query context. */
1188 630721 : MemoryContextSwitchTo(oldcontext);
1189 :
1190 630721 : if (cstate->whereClause)
1191 : {
1192 33 : econtext->ecxt_scantuple = myslot;
1193 : /* Skip items that don't match COPY's WHERE clause */
1194 33 : if (!ExecQual(cstate->qualexpr, econtext))
1195 : {
1196 : /*
1197 : * Report that this tuple was filtered out by the WHERE
1198 : * clause.
1199 : */
1200 18 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1201 : ++excluded);
1202 18 : continue;
1203 : }
1204 : }
1205 :
1206 : /* Determine the partition to insert the tuple into */
1207 630703 : if (proute)
1208 : {
1209 : TupleConversionMap *map;
1210 :
1211 : /*
1212 : * Attempt to find a partition suitable for this tuple.
1213 : * ExecFindPartition() will raise an error if none can be found or
1214 : * if the found partition is not suitable for INSERTs.
1215 : */
1216 137197 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1217 : proute, myslot, estate);
1218 :
1219 137196 : if (prevResultRelInfo != resultRelInfo)
1220 : {
1221 : /* Determine which triggers exist on this partition */
1222 80785 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1223 27 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1224 :
1225 80785 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1226 27 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1227 :
1228 : /*
1229 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1230 : * OF triggers, or if the partition is a foreign table that
1231 : * can't use batching.
1232 : */
1233 131487 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1234 50729 : !has_before_insert_row_trig &&
1235 182204 : !has_instead_insert_row_trig &&
1236 50717 : (resultRelInfo->ri_FdwRoutine == NULL ||
1237 6 : resultRelInfo->ri_BatchSize > 1);
1238 :
1239 : /* Set the multi-insert buffer to use for this partition. */
1240 80758 : if (leafpart_use_multi_insert)
1241 : {
1242 50715 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1243 43 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1244 : resultRelInfo);
1245 : }
1246 30043 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1247 14 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1248 : {
1249 : /*
1250 : * Flush pending inserts if this partition can't use
1251 : * batching, so rows are visible to triggers etc.
1252 : */
1253 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1254 : resultRelInfo,
1255 : &processed);
1256 : }
1257 :
1258 80758 : if (bistate != NULL)
1259 80758 : ReleaseBulkInsertStatePin(bistate);
1260 80758 : prevResultRelInfo = resultRelInfo;
1261 : }
1262 :
1263 : /*
1264 : * If we're capturing transition tuples, we might need to convert
1265 : * from the partition rowtype to root rowtype. But if there are no
1266 : * BEFORE triggers on the partition that could change the tuple,
1267 : * we can just remember the original unconverted tuple to avoid a
1268 : * needless round trip conversion.
1269 : */
1270 137196 : if (cstate->transition_capture != NULL)
1271 29 : cstate->transition_capture->tcs_original_insert_tuple =
1272 29 : !has_before_insert_row_trig ? myslot : NULL;
1273 :
1274 : /*
1275 : * We might need to convert from the root rowtype to the partition
1276 : * rowtype.
1277 : */
1278 137196 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1279 137196 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1280 : {
1281 : /* non batch insert */
1282 30070 : if (map != NULL)
1283 : {
1284 : TupleTableSlot *new_slot;
1285 :
1286 55 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1287 55 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1288 : }
1289 : }
1290 : else
1291 : {
1292 : /*
1293 : * Prepare to queue up tuple for later batch insert into
1294 : * current partition.
1295 : */
1296 : TupleTableSlot *batchslot;
1297 :
1298 : /* no other path available for partitioned table */
1299 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1300 :
1301 107126 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1302 : resultRelInfo);
1303 :
1304 107126 : if (map != NULL)
1305 6100 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1306 : batchslot);
1307 : else
1308 : {
1309 : /*
1310 : * This looks more expensive than it is (Believe me, I
1311 : * optimized it away. Twice.). The input is in virtual
1312 : * form, and we'll materialize the slot below - for most
1313 : * slot types the copy performs the work materialization
1314 : * would later require anyway.
1315 : */
1316 101026 : ExecCopySlot(batchslot, myslot);
1317 101026 : myslot = batchslot;
1318 : }
1319 : }
1320 :
1321 : /* ensure that triggers etc see the right relation */
1322 137196 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1323 : }
1324 :
1325 630702 : skip_tuple = false;
1326 :
1327 : /* BEFORE ROW INSERT Triggers */
1328 630702 : if (has_before_insert_row_trig)
1329 : {
1330 137 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1331 8 : skip_tuple = true; /* "do nothing" */
1332 : }
1333 :
1334 630702 : if (!skip_tuple)
1335 : {
1336 : /*
1337 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1338 : * tuple. Otherwise, proceed with inserting the tuple into the
1339 : * table or foreign table.
1340 : */
1341 630694 : if (has_instead_insert_row_trig)
1342 : {
1343 6 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1344 : }
1345 : else
1346 : {
1347 : /* Compute stored generated columns */
1348 630688 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1349 231012 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1350 17 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1351 : CMD_INSERT);
1352 :
1353 : /*
1354 : * If the target is a plain table, check the constraints of
1355 : * the tuple.
1356 : */
1357 630688 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1358 630642 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1359 230994 : ExecConstraints(resultRelInfo, myslot, estate);
1360 :
1361 : /*
1362 : * Also check the tuple against the partition constraint, if
1363 : * there is one; except that if we got here via tuple-routing,
1364 : * we don't need to if there's no BR trigger defined on the
1365 : * partition.
1366 : */
1367 630673 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1368 137190 : (proute == NULL || has_before_insert_row_trig))
1369 1154 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1370 :
1371 : /* Store the slot in the multi-insert buffer, when enabled. */
1372 630673 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1373 : {
1374 : /*
1375 : * The slot previously might point into the per-tuple
1376 : * context. For batching it needs to be longer lived.
1377 : */
1378 600499 : ExecMaterializeSlot(myslot);
1379 :
1380 : /* Add this tuple to the tuple buffer */
1381 600499 : CopyMultiInsertInfoStore(&multiInsertInfo,
1382 : resultRelInfo, myslot,
1383 : cstate->line_buf.len,
1384 : cstate->cur_lineno);
1385 :
1386 : /*
1387 : * If enough inserts have queued up, then flush all
1388 : * buffers out to their tables.
1389 : */
1390 600499 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1391 548 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1392 : resultRelInfo,
1393 : &processed);
1394 :
1395 : /*
1396 : * We delay updating the row counter and progress of the
1397 : * COPY command until after writing the tuples stored in
1398 : * the buffer out to the table, as in single insert mode.
1399 : * See CopyMultiInsertBufferFlush().
1400 : */
1401 600499 : continue; /* next tuple please */
1402 : }
1403 : else
1404 : {
1405 30174 : List *recheckIndexes = NIL;
1406 :
1407 : /* OK, store the tuple */
1408 30174 : if (resultRelInfo->ri_FdwRoutine != NULL)
1409 : {
1410 27 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1411 : resultRelInfo,
1412 : myslot,
1413 : NULL);
1414 :
1415 26 : if (myslot == NULL) /* "do nothing" */
1416 2 : continue; /* next tuple please */
1417 :
1418 : /*
1419 : * AFTER ROW Triggers might reference the tableoid
1420 : * column, so (re-)initialize tts_tableOid before
1421 : * evaluating them.
1422 : */
1423 24 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1424 : }
1425 : else
1426 : {
1427 : /* OK, store the tuple and create index entries for it */
1428 30147 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1429 : myslot, mycid, ti_options, bistate);
1430 :
1431 30147 : if (resultRelInfo->ri_NumIndices > 0)
1432 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1433 : estate, 0,
1434 : myslot, NIL,
1435 : NULL);
1436 : }
1437 :
1438 : /* AFTER ROW INSERT Triggers */
1439 30171 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1440 : recheckIndexes, cstate->transition_capture);
1441 :
1442 30169 : list_free(recheckIndexes);
1443 : }
1444 : }
1445 :
1446 : /*
1447 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1448 : * or FDW; this is the same definition used by nodeModifyTable.c
1449 : * for counting tuples inserted by an INSERT command. Update
1450 : * progress of the COPY command as well.
1451 : */
1452 30175 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1453 : ++processed);
1454 : }
1455 : }
1456 :
1457 : /* Flush any remaining buffered tuples */
1458 812 : if (insertMethod != CIM_SINGLE)
1459 : {
1460 752 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1461 588 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1462 : }
1463 :
1464 : /* Done, clean up */
1465 799 : error_context_stack = errcallback.previous;
1466 :
1467 799 : if (cstate->num_errors > 0 &&
1468 18 : cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1469 : {
1470 15 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1471 12 : ereport(NOTICE,
1472 : errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
1473 : "%" PRIu64 " rows were skipped due to data type incompatibility",
1474 : cstate->num_errors,
1475 : cstate->num_errors));
1476 3 : else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1477 3 : ereport(NOTICE,
1478 : errmsg_plural("in %" PRIu64 " row, columns were set to null due to data type incompatibility",
1479 : "in %" PRIu64 " rows, columns were set to null due to data type incompatibility",
1480 : cstate->num_errors,
1481 : cstate->num_errors));
1482 : }
1483 :
1484 799 : if (bistate != NULL)
1485 99 : FreeBulkInsertState(bistate);
1486 :
1487 799 : MemoryContextSwitchTo(oldcontext);
1488 :
1489 : /* Execute AFTER STATEMENT insertion triggers */
1490 799 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1491 :
1492 : /* Handle queued AFTER triggers */
1493 799 : AfterTriggerEndQuery(estate);
1494 :
1495 799 : ExecResetTupleTable(estate->es_tupleTable, false);
1496 :
1497 : /* Allow the FDW to shut down */
1498 799 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1499 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1500 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1501 : target_resultRelInfo);
1502 :
1503 : /* Tear down the multi-insert buffer data */
1504 799 : if (insertMethod != CIM_SINGLE)
1505 739 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1506 :
1507 : /* Close all the partitioned tables, leaf partitions, and their indices */
1508 799 : if (proute)
1509 51 : ExecCleanupTupleRouting(mtstate, proute);
1510 :
1511 : /* Close the result relations, including any trigger target relations */
1512 799 : ExecCloseResultRelations(estate);
1513 799 : ExecCloseRangeTableRelations(estate);
1514 :
1515 799 : FreeExecutorState(estate);
1516 :
1517 799 : return processed;
1518 : }
1519 :
1520 : /*
1521 : * Setup to read tuples from a file for COPY FROM.
1522 : *
1523 : * 'rel': Used as a template for the tuples
1524 : * 'whereClause': WHERE clause from the COPY FROM command
1525 : * 'filename': Name of server-local file to read, NULL for STDIN
1526 : * 'is_program': true if 'filename' is program to execute
1527 : * 'data_source_cb': callback that provides the input data
1528 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1529 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1530 : *
1531 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1532 : */
1533 : CopyFromState
1534 1129 : BeginCopyFrom(ParseState *pstate,
1535 : Relation rel,
1536 : Node *whereClause,
1537 : const char *filename,
1538 : bool is_program,
1539 : copy_data_source_cb data_source_cb,
1540 : List *attnamelist,
1541 : List *options)
1542 : {
1543 : CopyFromState cstate;
1544 1129 : bool pipe = (filename == NULL);
1545 : TupleDesc tupDesc;
1546 : AttrNumber num_phys_attrs,
1547 : num_defaults;
1548 : FmgrInfo *in_functions;
1549 : Oid *typioparams;
1550 : int *defmap;
1551 : ExprState **defexprs;
1552 : MemoryContext oldcontext;
1553 : bool volatile_defexprs;
1554 1129 : const int progress_cols[] = {
1555 : PROGRESS_COPY_COMMAND,
1556 : PROGRESS_COPY_TYPE,
1557 : PROGRESS_COPY_BYTES_TOTAL
1558 : };
1559 1129 : int64 progress_vals[] = {
1560 : PROGRESS_COPY_COMMAND_FROM,
1561 : 0,
1562 : 0
1563 : };
1564 :
1565 : /* Allocate workspace and zero all fields */
1566 1129 : cstate = palloc0_object(CopyFromStateData);
1567 :
1568 : /*
1569 : * We allocate everything used by a cstate in a new memory context. This
1570 : * avoids memory leaks during repeated use of COPY in a query.
1571 : */
1572 1129 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1573 : "COPY",
1574 : ALLOCSET_DEFAULT_SIZES);
1575 :
1576 1129 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1577 :
1578 : /* Extract options from the statement node tree */
1579 1129 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1580 :
1581 : /* Set the format routine */
1582 996 : cstate->routine = CopyFromGetRoutine(&cstate->opts);
1583 :
1584 : /* Process the target relation */
1585 996 : cstate->rel = rel;
1586 :
1587 996 : tupDesc = RelationGetDescr(cstate->rel);
1588 :
1589 : /* process common options or initialization */
1590 :
1591 : /* Generate or convert list of attributes to process */
1592 996 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1593 :
1594 996 : num_phys_attrs = tupDesc->natts;
1595 :
1596 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1597 996 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1598 996 : if (cstate->opts.force_notnull_all)
1599 6 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1600 990 : else if (cstate->opts.force_notnull)
1601 : {
1602 : List *attnums;
1603 : ListCell *cur;
1604 :
1605 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1606 :
1607 26 : foreach(cur, attnums)
1608 : {
1609 16 : int attnum = lfirst_int(cur);
1610 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1611 :
1612 16 : if (!list_member_int(cstate->attnumlist, attnum))
1613 3 : ereport(ERROR,
1614 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1615 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1616 : errmsg("%s column \"%s\" not referenced by COPY",
1617 : "FORCE_NOT_NULL", NameStr(attr->attname))));
1618 13 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1619 : }
1620 : }
1621 :
1622 : /* Set up soft error handler for ON_ERROR */
1623 993 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1624 : {
1625 50 : cstate->escontext = makeNode(ErrorSaveContext);
1626 50 : cstate->escontext->type = T_ErrorSaveContext;
1627 50 : cstate->escontext->error_occurred = false;
1628 :
1629 50 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
1630 18 : cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1631 50 : cstate->escontext->details_wanted = false;
1632 : }
1633 : else
1634 943 : cstate->escontext = NULL;
1635 :
1636 993 : if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
1637 : {
1638 18 : int attr_count = list_length(cstate->attnumlist);
1639 :
1640 : /*
1641 : * When data type conversion fails and ON_ERROR is SET_NULL, we need
1642 : * ensure that the input column allow null values. ExecConstraints()
1643 : * will cover most of the cases, but it does not verify domain
1644 : * constraints. Therefore, for constrained domains, the null value
1645 : * check must be performed during the initial string-to-datum
1646 : * conversion (see CopyFromTextLikeOneRow()).
1647 : */
1648 18 : cstate->domain_with_constraint = palloc0_array(bool, attr_count);
1649 :
1650 90 : foreach_int(attno, cstate->attnumlist)
1651 : {
1652 54 : int i = foreach_current_index(attno);
1653 :
1654 54 : Form_pg_attribute att = TupleDescAttr(tupDesc, attno - 1);
1655 :
1656 54 : cstate->domain_with_constraint[i] = DomainHasConstraints(att->atttypid);
1657 : }
1658 : }
1659 :
1660 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1661 993 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1662 993 : if (cstate->opts.force_null_all)
1663 6 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1664 987 : else if (cstate->opts.force_null)
1665 : {
1666 : List *attnums;
1667 : ListCell *cur;
1668 :
1669 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1670 :
1671 26 : foreach(cur, attnums)
1672 : {
1673 16 : int attnum = lfirst_int(cur);
1674 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1675 :
1676 16 : if (!list_member_int(cstate->attnumlist, attnum))
1677 3 : ereport(ERROR,
1678 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1679 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1680 : errmsg("%s column \"%s\" not referenced by COPY",
1681 : "FORCE_NULL", NameStr(attr->attname))));
1682 13 : cstate->opts.force_null_flags[attnum - 1] = true;
1683 : }
1684 : }
1685 :
1686 : /* Convert convert_selectively name list to per-column flags */
1687 990 : if (cstate->opts.convert_selectively)
1688 : {
1689 : List *attnums;
1690 : ListCell *cur;
1691 :
1692 3 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1693 :
1694 3 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1695 :
1696 7 : foreach(cur, attnums)
1697 : {
1698 4 : int attnum = lfirst_int(cur);
1699 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1700 :
1701 4 : if (!list_member_int(cstate->attnumlist, attnum))
1702 0 : ereport(ERROR,
1703 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1704 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1705 : NameStr(attr->attname))));
1706 4 : cstate->convert_select_flags[attnum - 1] = true;
1707 : }
1708 : }
1709 :
1710 : /* Use client encoding when ENCODING option is not specified. */
1711 990 : if (cstate->opts.file_encoding < 0)
1712 975 : cstate->file_encoding = pg_get_client_encoding();
1713 : else
1714 15 : cstate->file_encoding = cstate->opts.file_encoding;
1715 :
1716 : /*
1717 : * Look up encoding conversion function.
1718 : */
1719 990 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1720 39 : cstate->file_encoding == PG_SQL_ASCII ||
1721 18 : GetDatabaseEncoding() == PG_SQL_ASCII)
1722 : {
1723 972 : cstate->need_transcoding = false;
1724 : }
1725 : else
1726 : {
1727 18 : cstate->need_transcoding = true;
1728 18 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1729 : GetDatabaseEncoding());
1730 18 : if (!OidIsValid(cstate->conversion_proc))
1731 0 : ereport(ERROR,
1732 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1733 : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1734 : pg_encoding_to_char(cstate->file_encoding),
1735 : pg_encoding_to_char(GetDatabaseEncoding()))));
1736 : }
1737 :
1738 990 : cstate->copy_src = COPY_FILE; /* default */
1739 :
1740 990 : cstate->whereClause = whereClause;
1741 :
1742 : /* Initialize state variables */
1743 990 : cstate->eol_type = EOL_UNKNOWN;
1744 990 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1745 990 : cstate->cur_lineno = 0;
1746 990 : cstate->cur_attname = NULL;
1747 990 : cstate->cur_attval = NULL;
1748 990 : cstate->relname_only = false;
1749 :
1750 : /*
1751 : * Allocate buffers for the input pipeline.
1752 : *
1753 : * attribute_buf and raw_buf are used in both text and binary modes, but
1754 : * input_buf and line_buf only in text mode.
1755 : */
1756 990 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1757 990 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1758 990 : cstate->raw_reached_eof = false;
1759 :
1760 990 : initStringInfo(&cstate->attribute_buf);
1761 :
1762 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1763 990 : if (pstate)
1764 : {
1765 953 : cstate->range_table = pstate->p_rtable;
1766 953 : cstate->rteperminfos = pstate->p_rteperminfos;
1767 : }
1768 :
1769 990 : num_defaults = 0;
1770 990 : volatile_defexprs = false;
1771 :
1772 : /*
1773 : * Pick up the required catalog information for each attribute in the
1774 : * relation, including the input function, the element type (to pass to
1775 : * the input function), and info about defaults and constraints. (Which
1776 : * input function we use depends on text/binary format choice.)
1777 : */
1778 990 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1779 990 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1780 990 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1781 990 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1782 :
1783 3904 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1784 : {
1785 2921 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1786 :
1787 : /* We don't need info for dropped attributes */
1788 2921 : if (att->attisdropped)
1789 62 : continue;
1790 :
1791 : /* Fetch the input function and typioparam info */
1792 2859 : cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1793 2859 : &in_functions[attnum - 1],
1794 2859 : &typioparams[attnum - 1]);
1795 :
1796 : /* Get default info if available */
1797 2858 : defexprs[attnum - 1] = NULL;
1798 :
1799 : /*
1800 : * We only need the default values for columns that do not appear in
1801 : * the column list, unless the DEFAULT option was given. We never need
1802 : * default values for generated columns.
1803 : */
1804 2858 : if ((cstate->opts.default_print != NULL ||
1805 2795 : !list_member_int(cstate->attnumlist, attnum)) &&
1806 341 : !att->attgenerated)
1807 : {
1808 323 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1809 : attnum);
1810 :
1811 323 : if (defexpr != NULL)
1812 : {
1813 : /* Run the expression through planner */
1814 132 : defexpr = expression_planner(defexpr);
1815 :
1816 : /* Initialize executable expression in copycontext */
1817 126 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1818 :
1819 : /* if NOT copied from input */
1820 : /* use default value if one exists */
1821 126 : if (!list_member_int(cstate->attnumlist, attnum))
1822 : {
1823 86 : defmap[num_defaults] = attnum - 1;
1824 86 : num_defaults++;
1825 : }
1826 :
1827 : /*
1828 : * If a default expression looks at the table being loaded,
1829 : * then it could give the wrong answer when using
1830 : * multi-insert. Since database access can be dynamic this is
1831 : * hard to test for exactly, so we use the much wider test of
1832 : * whether the default expression is volatile. We allow for
1833 : * the special case of when the default expression is the
1834 : * nextval() of a sequence which in this specific case is
1835 : * known to be safe for use with the multi-insert
1836 : * optimization. Hence we use this special case function
1837 : * checker rather than the standard check for
1838 : * contain_volatile_functions(). Note also that we already
1839 : * ran the expression through expression_planner().
1840 : */
1841 126 : if (!volatile_defexprs)
1842 126 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1843 : }
1844 : }
1845 : }
1846 :
1847 983 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1848 :
1849 : /* initialize progress */
1850 983 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1851 983 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1852 983 : cstate->bytes_processed = 0;
1853 :
1854 : /* We keep those variables in cstate. */
1855 983 : cstate->in_functions = in_functions;
1856 983 : cstate->typioparams = typioparams;
1857 983 : cstate->defmap = defmap;
1858 983 : cstate->defexprs = defexprs;
1859 983 : cstate->volatile_defexprs = volatile_defexprs;
1860 983 : cstate->num_defaults = num_defaults;
1861 983 : cstate->is_program = is_program;
1862 :
1863 983 : if (data_source_cb)
1864 : {
1865 208 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1866 208 : cstate->copy_src = COPY_CALLBACK;
1867 208 : cstate->data_source_cb = data_source_cb;
1868 : }
1869 775 : else if (pipe)
1870 : {
1871 541 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1872 : Assert(!is_program); /* the grammar does not allow this */
1873 541 : if (whereToSendOutput == DestRemote)
1874 541 : ReceiveCopyBegin(cstate);
1875 : else
1876 0 : cstate->copy_file = stdin;
1877 : }
1878 : else
1879 : {
1880 234 : cstate->filename = pstrdup(filename);
1881 :
1882 234 : if (cstate->is_program)
1883 : {
1884 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1885 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1886 0 : if (cstate->copy_file == NULL)
1887 0 : ereport(ERROR,
1888 : (errcode_for_file_access(),
1889 : errmsg("could not execute command \"%s\": %m",
1890 : cstate->filename)));
1891 : }
1892 : else
1893 : {
1894 : struct stat st;
1895 :
1896 234 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1897 234 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1898 234 : if (cstate->copy_file == NULL)
1899 : {
1900 : /* copy errno because ereport subfunctions might change it */
1901 0 : int save_errno = errno;
1902 :
1903 0 : ereport(ERROR,
1904 : (errcode_for_file_access(),
1905 : errmsg("could not open file \"%s\" for reading: %m",
1906 : cstate->filename),
1907 : (save_errno == ENOENT || save_errno == EACCES) ?
1908 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1909 : "You may want a client-side facility such as psql's \\copy.") : 0));
1910 : }
1911 :
1912 234 : if (fstat(fileno(cstate->copy_file), &st))
1913 0 : ereport(ERROR,
1914 : (errcode_for_file_access(),
1915 : errmsg("could not stat file \"%s\": %m",
1916 : cstate->filename)));
1917 :
1918 234 : if (S_ISDIR(st.st_mode))
1919 0 : ereport(ERROR,
1920 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1921 : errmsg("\"%s\" is a directory", cstate->filename)));
1922 :
1923 234 : progress_vals[2] = st.st_size;
1924 : }
1925 : }
1926 :
1927 983 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1928 :
1929 983 : cstate->routine->CopyFromStart(cstate, tupDesc);
1930 :
1931 983 : MemoryContextSwitchTo(oldcontext);
1932 :
1933 983 : return cstate;
1934 : }
1935 :
1936 : /*
1937 : * Clean up storage and release resources for COPY FROM.
1938 : */
1939 : void
1940 638 : EndCopyFrom(CopyFromState cstate)
1941 : {
1942 : /* Invoke the end callback */
1943 638 : cstate->routine->CopyFromEnd(cstate);
1944 :
1945 : /* No COPY FROM related resources except memory. */
1946 638 : if (cstate->is_program)
1947 : {
1948 0 : ClosePipeFromProgram(cstate);
1949 : }
1950 : else
1951 : {
1952 638 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1953 0 : ereport(ERROR,
1954 : (errcode_for_file_access(),
1955 : errmsg("could not close file \"%s\": %m",
1956 : cstate->filename)));
1957 : }
1958 :
1959 638 : pgstat_progress_end_command();
1960 :
1961 638 : MemoryContextDelete(cstate->copycontext);
1962 638 : pfree(cstate);
1963 638 : }
1964 :
1965 : /*
1966 : * Closes the pipe from an external program, checking the pclose() return code.
1967 : */
1968 : static void
1969 0 : ClosePipeFromProgram(CopyFromState cstate)
1970 : {
1971 : int pclose_rc;
1972 :
1973 : Assert(cstate->is_program);
1974 :
1975 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1976 0 : if (pclose_rc == -1)
1977 0 : ereport(ERROR,
1978 : (errcode_for_file_access(),
1979 : errmsg("could not close pipe to external command: %m")));
1980 0 : else if (pclose_rc != 0)
1981 : {
1982 : /*
1983 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1984 : * expectable for the called program to fail with SIGPIPE, and we
1985 : * should not report that as an error. Otherwise, SIGPIPE indicates a
1986 : * problem.
1987 : */
1988 0 : if (!cstate->raw_reached_eof &&
1989 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1990 0 : return;
1991 :
1992 0 : ereport(ERROR,
1993 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1994 : errmsg("program \"%s\" failed",
1995 : cstate->filename),
1996 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1997 : }
1998 : }
|