Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/tableam.h"
29 : #include "access/xact.h"
30 : #include "catalog/namespace.h"
31 : #include "commands/copy.h"
32 : #include "commands/copyfrom_internal.h"
33 : #include "commands/progress.h"
34 : #include "commands/trigger.h"
35 : #include "executor/execPartition.h"
36 : #include "executor/executor.h"
37 : #include "executor/nodeModifyTable.h"
38 : #include "executor/tuptable.h"
39 : #include "foreign/fdwapi.h"
40 : #include "mb/pg_wchar.h"
41 : #include "miscadmin.h"
42 : #include "nodes/miscnodes.h"
43 : #include "optimizer/optimizer.h"
44 : #include "pgstat.h"
45 : #include "rewrite/rewriteHandler.h"
46 : #include "storage/fd.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/lsyscache.h"
49 : #include "utils/memutils.h"
50 : #include "utils/portal.h"
51 : #include "utils/rel.h"
52 : #include "utils/snapmgr.h"
53 :
54 : /*
55 : * No more than this many tuples per CopyMultiInsertBuffer
56 : *
57 : * Caution: Don't make this too big, as we could end up with this many
58 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
59 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
60 : * memory requirements during copies into partitioned tables with a large
61 : * number of partitions.
62 : */
63 : #define MAX_BUFFERED_TUPLES 1000
64 :
65 : /*
66 : * Flush buffers if there are >= this many bytes, as counted by the input
67 : * size, of tuples stored.
68 : */
69 : #define MAX_BUFFERED_BYTES 65535
70 :
71 : /*
72 : * Trim the list of buffers back down to this number after flushing. This
73 : * must be >= 2.
74 : */
75 : #define MAX_PARTITION_BUFFERS 32
76 :
77 : /* Stores multi-insert data related to a single relation in CopyFrom. */
78 : typedef struct CopyMultiInsertBuffer
79 : {
80 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
81 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
82 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
83 : * table; NULL if foreign table */
84 : int nused; /* number of 'slots' containing tuples */
85 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
86 : * stream */
87 : } CopyMultiInsertBuffer;
88 :
89 : /*
90 : * Stores one or many CopyMultiInsertBuffers and details about the size and
91 : * number of tuples which are stored in them. This allows multiple buffers to
92 : * exist at once when COPYing into a partitioned table.
93 : */
94 : typedef struct CopyMultiInsertInfo
95 : {
96 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
97 : int bufferedTuples; /* number of tuples buffered over all buffers */
98 : int bufferedBytes; /* number of bytes from all buffered tuples */
99 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
100 : EState *estate; /* Executor state used for COPY */
101 : CommandId mycid; /* Command Id used for COPY */
102 : int ti_options; /* table insert options */
103 : } CopyMultiInsertInfo;
104 :
105 :
106 : /* non-export function prototypes */
107 : static void ClosePipeFromProgram(CopyFromState cstate);
108 :
109 : /*
110 : * error context callback for COPY FROM
111 : *
112 : * The argument for the error context must be CopyFromState.
113 : */
114 : void
115 300 : CopyFromErrorCallback(void *arg)
116 : {
117 300 : CopyFromState cstate = (CopyFromState) arg;
118 :
119 300 : if (cstate->relname_only)
120 : {
121 44 : errcontext("COPY %s",
122 : cstate->cur_relname);
123 44 : return;
124 : }
125 256 : if (cstate->opts.binary)
126 : {
127 : /* can't usefully display the data */
128 2 : if (cstate->cur_attname)
129 2 : errcontext("COPY %s, line %llu, column %s",
130 : cstate->cur_relname,
131 2 : (unsigned long long) cstate->cur_lineno,
132 : cstate->cur_attname);
133 : else
134 0 : errcontext("COPY %s, line %llu",
135 : cstate->cur_relname,
136 0 : (unsigned long long) cstate->cur_lineno);
137 : }
138 : else
139 : {
140 254 : if (cstate->cur_attname && cstate->cur_attval)
141 40 : {
142 : /* error is relevant to a particular column */
143 : char *attval;
144 :
145 40 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
146 40 : errcontext("COPY %s, line %llu, column %s: \"%s\"",
147 : cstate->cur_relname,
148 40 : (unsigned long long) cstate->cur_lineno,
149 : cstate->cur_attname,
150 : attval);
151 40 : pfree(attval);
152 : }
153 214 : else if (cstate->cur_attname)
154 : {
155 : /* error is relevant to a particular column, value is NULL */
156 6 : errcontext("COPY %s, line %llu, column %s: null input",
157 : cstate->cur_relname,
158 6 : (unsigned long long) cstate->cur_lineno,
159 : cstate->cur_attname);
160 : }
161 : else
162 : {
163 : /*
164 : * Error is relevant to a particular line.
165 : *
166 : * If line_buf still contains the correct line, print it.
167 : */
168 208 : if (cstate->line_buf_valid)
169 : {
170 : char *lineval;
171 :
172 184 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
173 184 : errcontext("COPY %s, line %llu: \"%s\"",
174 : cstate->cur_relname,
175 184 : (unsigned long long) cstate->cur_lineno, lineval);
176 184 : pfree(lineval);
177 : }
178 : else
179 : {
180 24 : errcontext("COPY %s, line %llu",
181 : cstate->cur_relname,
182 24 : (unsigned long long) cstate->cur_lineno);
183 : }
184 : }
185 : }
186 : }
187 :
188 : /*
189 : * Make sure we don't print an unreasonable amount of COPY data in a message.
190 : *
191 : * Returns a pstrdup'd copy of the input.
192 : */
193 : char *
194 260 : CopyLimitPrintoutLength(const char *str)
195 : {
196 : #define MAX_COPY_DATA_DISPLAY 100
197 :
198 260 : int slen = strlen(str);
199 : int len;
200 : char *res;
201 :
202 : /* Fast path if definitely okay */
203 260 : if (slen <= MAX_COPY_DATA_DISPLAY)
204 260 : return pstrdup(str);
205 :
206 : /* Apply encoding-dependent truncation */
207 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
208 :
209 : /*
210 : * Truncate, and add "..." to show we truncated the input.
211 : */
212 0 : res = (char *) palloc(len + 4);
213 0 : memcpy(res, str, len);
214 0 : strcpy(res + len, "...");
215 :
216 0 : return res;
217 : }
218 :
219 : /*
220 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
221 : * ResultRelInfo.
222 : */
223 : static CopyMultiInsertBuffer *
224 1438 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
225 : {
226 : CopyMultiInsertBuffer *buffer;
227 :
228 1438 : buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
229 1438 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
230 1438 : buffer->resultRelInfo = rri;
231 1438 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
232 1438 : buffer->nused = 0;
233 :
234 1438 : return buffer;
235 : }
236 :
237 : /*
238 : * Make a new buffer for this ResultRelInfo.
239 : */
240 : static inline void
241 1438 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
242 : ResultRelInfo *rri)
243 : {
244 : CopyMultiInsertBuffer *buffer;
245 :
246 1438 : buffer = CopyMultiInsertBufferInit(rri);
247 :
248 : /* Setup back-link so we can easily find this buffer again */
249 1438 : rri->ri_CopyMultiInsertBuffer = buffer;
250 : /* Record that we're tracking this buffer */
251 1438 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
252 1438 : }
253 :
254 : /*
255 : * Initialize an already allocated CopyMultiInsertInfo.
256 : *
257 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
258 : * for that table.
259 : */
260 : static void
261 1426 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
262 : CopyFromState cstate, EState *estate, CommandId mycid,
263 : int ti_options)
264 : {
265 1426 : miinfo->multiInsertBuffers = NIL;
266 1426 : miinfo->bufferedTuples = 0;
267 1426 : miinfo->bufferedBytes = 0;
268 1426 : miinfo->cstate = cstate;
269 1426 : miinfo->estate = estate;
270 1426 : miinfo->mycid = mycid;
271 1426 : miinfo->ti_options = ti_options;
272 :
273 : /*
274 : * Only setup the buffer when not dealing with a partitioned table.
275 : * Buffers for partitioned tables will just be setup when we need to send
276 : * tuples their way for the first time.
277 : */
278 1426 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
279 1352 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
280 1426 : }
281 :
282 : /*
283 : * Returns true if the buffers are full
284 : */
285 : static inline bool
286 1195578 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
287 : {
288 1195578 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
289 1194584 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
290 1100 : return true;
291 1194478 : return false;
292 : }
293 :
294 : /*
295 : * Returns true if we have no buffered tuples
296 : */
297 : static inline bool
298 1300 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
299 : {
300 1300 : return miinfo->bufferedTuples == 0;
301 : }
302 :
303 : /*
304 : * Write the tuples stored in 'buffer' out to the table.
305 : */
306 : static inline void
307 2368 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
308 : CopyMultiInsertBuffer *buffer,
309 : int64 *processed)
310 : {
311 2368 : CopyFromState cstate = miinfo->cstate;
312 2368 : EState *estate = miinfo->estate;
313 2368 : int nused = buffer->nused;
314 2368 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
315 2368 : TupleTableSlot **slots = buffer->slots;
316 : int i;
317 :
318 2368 : if (resultRelInfo->ri_FdwRoutine)
319 : {
320 14 : int batch_size = resultRelInfo->ri_BatchSize;
321 14 : int sent = 0;
322 :
323 : Assert(buffer->bistate == NULL);
324 :
325 : /* Ensure that the FDW supports batching and it's enabled */
326 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
327 : Assert(batch_size > 1);
328 :
329 : /*
330 : * We suppress error context information other than the relation name,
331 : * if one of the operations below fails.
332 : */
333 : Assert(!cstate->relname_only);
334 14 : cstate->relname_only = true;
335 :
336 38 : while (sent < nused)
337 : {
338 26 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
339 26 : int inserted = size;
340 : TupleTableSlot **rslots;
341 :
342 : /* insert into foreign table: let the FDW do it */
343 : rslots =
344 26 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
345 : resultRelInfo,
346 26 : &slots[sent],
347 : NULL,
348 : &inserted);
349 :
350 24 : sent += size;
351 :
352 : /* No need to do anything if there are no inserted rows */
353 24 : if (inserted <= 0)
354 4 : continue;
355 :
356 : /* Triggers on foreign tables should not have transition tables */
357 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
358 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
359 :
360 : /* Run AFTER ROW INSERT triggers */
361 20 : if (resultRelInfo->ri_TrigDesc != NULL &&
362 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
363 : {
364 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
365 :
366 0 : for (i = 0; i < inserted; i++)
367 : {
368 0 : TupleTableSlot *slot = rslots[i];
369 :
370 : /*
371 : * AFTER ROW Triggers might reference the tableoid column,
372 : * so (re-)initialize tts_tableOid before evaluating them.
373 : */
374 0 : slot->tts_tableOid = relid;
375 :
376 0 : ExecARInsertTriggers(estate, resultRelInfo,
377 : slot, NIL,
378 : cstate->transition_capture);
379 : }
380 : }
381 :
382 : /* Update the row counter and progress of the COPY command */
383 20 : *processed += inserted;
384 20 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
385 : *processed);
386 : }
387 :
388 48 : for (i = 0; i < nused; i++)
389 36 : ExecClearTuple(slots[i]);
390 :
391 : /* reset relname_only */
392 12 : cstate->relname_only = false;
393 : }
394 : else
395 : {
396 2354 : CommandId mycid = miinfo->mycid;
397 2354 : int ti_options = miinfo->ti_options;
398 2354 : bool line_buf_valid = cstate->line_buf_valid;
399 2354 : uint64 save_cur_lineno = cstate->cur_lineno;
400 : MemoryContext oldcontext;
401 :
402 : Assert(buffer->bistate != NULL);
403 :
404 : /*
405 : * Print error context information correctly, if one of the operations
406 : * below fails.
407 : */
408 2354 : cstate->line_buf_valid = false;
409 :
410 : /*
411 : * table_multi_insert may leak memory, so switch to short-lived memory
412 : * context before calling it.
413 : */
414 2354 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
415 2354 : table_multi_insert(resultRelInfo->ri_RelationDesc,
416 : slots,
417 : nused,
418 : mycid,
419 : ti_options,
420 : buffer->bistate);
421 2354 : MemoryContextSwitchTo(oldcontext);
422 :
423 1197750 : for (i = 0; i < nused; i++)
424 : {
425 : /*
426 : * If there are any indexes, update them for all the inserted
427 : * tuples, and run AFTER ROW INSERT triggers.
428 : */
429 1195408 : if (resultRelInfo->ri_NumIndices > 0)
430 : {
431 : List *recheckIndexes;
432 :
433 201832 : cstate->cur_lineno = buffer->linenos[i];
434 : recheckIndexes =
435 201832 : ExecInsertIndexTuples(resultRelInfo,
436 : buffer->slots[i], estate, false,
437 : false, NULL, NIL, false);
438 201820 : ExecARInsertTriggers(estate, resultRelInfo,
439 201820 : slots[i], recheckIndexes,
440 : cstate->transition_capture);
441 201820 : list_free(recheckIndexes);
442 : }
443 :
444 : /*
445 : * There's no indexes, but see if we need to run AFTER ROW INSERT
446 : * triggers anyway.
447 : */
448 993576 : else if (resultRelInfo->ri_TrigDesc != NULL &&
449 78 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
450 60 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
451 : {
452 36 : cstate->cur_lineno = buffer->linenos[i];
453 36 : ExecARInsertTriggers(estate, resultRelInfo,
454 36 : slots[i], NIL,
455 : cstate->transition_capture);
456 : }
457 :
458 1195396 : ExecClearTuple(slots[i]);
459 : }
460 :
461 : /* Update the row counter and progress of the COPY command */
462 2342 : *processed += nused;
463 2342 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
464 : *processed);
465 :
466 : /* reset cur_lineno and line_buf_valid to what they were */
467 2342 : cstate->line_buf_valid = line_buf_valid;
468 2342 : cstate->cur_lineno = save_cur_lineno;
469 : }
470 :
471 : /* Mark that all slots are free */
472 2354 : buffer->nused = 0;
473 2354 : }
474 :
475 : /*
476 : * Drop used slots and free member for this buffer.
477 : *
478 : * The buffer must be flushed before cleanup.
479 : */
480 : static inline void
481 1270 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
482 : CopyMultiInsertBuffer *buffer)
483 : {
484 1270 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
485 : int i;
486 :
487 : /* Ensure buffer was flushed */
488 : Assert(buffer->nused == 0);
489 :
490 : /* Remove back-link to ourself */
491 1270 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
492 :
493 1270 : if (resultRelInfo->ri_FdwRoutine == NULL)
494 : {
495 : Assert(buffer->bistate != NULL);
496 1258 : FreeBulkInsertState(buffer->bistate);
497 : }
498 : else
499 : Assert(buffer->bistate == NULL);
500 :
501 : /* Since we only create slots on demand, just drop the non-null ones. */
502 235604 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
503 234334 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
504 :
505 1270 : if (resultRelInfo->ri_FdwRoutine == NULL)
506 1258 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
507 : miinfo->ti_options);
508 :
509 1270 : pfree(buffer);
510 1270 : }
511 :
512 : /*
513 : * Write out all stored tuples in all buffers out to the tables.
514 : *
515 : * Once flushed we also trim the tracked buffers list down to size by removing
516 : * the buffers created earliest first.
517 : *
518 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
519 : * used. When cleaning up old buffers we'll never remove the one for
520 : * 'curr_rri'.
521 : */
522 : static inline void
523 2142 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
524 : int64 *processed)
525 : {
526 : ListCell *lc;
527 :
528 4496 : foreach(lc, miinfo->multiInsertBuffers)
529 : {
530 2368 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
531 :
532 2368 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
533 : }
534 :
535 2128 : miinfo->bufferedTuples = 0;
536 2128 : miinfo->bufferedBytes = 0;
537 :
538 : /*
539 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
540 : * remove buffers starting with the ones we created first. It seems less
541 : * likely that these older ones will be needed than the ones that were
542 : * just created.
543 : */
544 2128 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
545 : {
546 : CopyMultiInsertBuffer *buffer;
547 :
548 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
549 :
550 : /*
551 : * We never want to remove the buffer that's currently being used, so
552 : * if we happen to find that then move it to the end of the list.
553 : */
554 0 : if (buffer->resultRelInfo == curr_rri)
555 : {
556 : /*
557 : * The code below would misbehave if we were trying to reduce the
558 : * list to less than two items.
559 : */
560 : StaticAssertDecl(MAX_PARTITION_BUFFERS >= 2,
561 : "MAX_PARTITION_BUFFERS must be >= 2");
562 :
563 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
564 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
565 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
566 : }
567 :
568 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
569 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
570 : }
571 2128 : }
572 :
573 : /*
574 : * Cleanup allocated buffers and free memory
575 : */
576 : static inline void
577 1258 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
578 : {
579 : ListCell *lc;
580 :
581 2528 : foreach(lc, miinfo->multiInsertBuffers)
582 1270 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
583 :
584 1258 : list_free(miinfo->multiInsertBuffers);
585 1258 : }
586 :
587 : /*
588 : * Get the next TupleTableSlot that the next tuple should be stored in.
589 : *
590 : * Callers must ensure that the buffer is not full.
591 : *
592 : * Note: 'miinfo' is unused but has been included for consistency with the
593 : * other functions in this area.
594 : */
595 : static inline TupleTableSlot *
596 1197026 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
597 : ResultRelInfo *rri)
598 : {
599 1197026 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
600 : int nused;
601 :
602 : Assert(buffer != NULL);
603 : Assert(buffer->nused < MAX_BUFFERED_TUPLES);
604 :
605 1197026 : nused = buffer->nused;
606 :
607 1197026 : if (buffer->slots[nused] == NULL)
608 234646 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
609 1197026 : return buffer->slots[nused];
610 : }
611 :
612 : /*
613 : * Record the previously reserved TupleTableSlot that was reserved by
614 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
615 : */
616 : static inline void
617 1195578 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
618 : TupleTableSlot *slot, int tuplen, uint64 lineno)
619 : {
620 1195578 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
621 :
622 : Assert(buffer != NULL);
623 : Assert(slot == buffer->slots[buffer->nused]);
624 :
625 : /* Store the line number so we can properly report any errors later */
626 1195578 : buffer->linenos[buffer->nused] = lineno;
627 :
628 : /* Record this slot as being used */
629 1195578 : buffer->nused++;
630 :
631 : /* Update how many tuples are stored and their size */
632 1195578 : miinfo->bufferedTuples++;
633 1195578 : miinfo->bufferedBytes += tuplen;
634 1195578 : }
635 :
636 : /*
637 : * Copy FROM file to relation.
638 : */
639 : uint64
640 1604 : CopyFrom(CopyFromState cstate)
641 : {
642 : ResultRelInfo *resultRelInfo;
643 : ResultRelInfo *target_resultRelInfo;
644 1604 : ResultRelInfo *prevResultRelInfo = NULL;
645 1604 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
646 : ModifyTableState *mtstate;
647 : ExprContext *econtext;
648 1604 : TupleTableSlot *singleslot = NULL;
649 1604 : MemoryContext oldcontext = CurrentMemoryContext;
650 :
651 1604 : PartitionTupleRouting *proute = NULL;
652 : ErrorContextCallback errcallback;
653 1604 : CommandId mycid = GetCurrentCommandId(true);
654 1604 : int ti_options = 0; /* start with default options for insert */
655 1604 : BulkInsertState bistate = NULL;
656 : CopyInsertMethod insertMethod;
657 1604 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
658 1604 : int64 processed = 0;
659 1604 : int64 excluded = 0;
660 : bool has_before_insert_row_trig;
661 : bool has_instead_insert_row_trig;
662 1604 : bool leafpart_use_multi_insert = false;
663 :
664 : Assert(cstate->rel);
665 : Assert(list_length(cstate->range_table) == 1);
666 :
667 1604 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
668 : Assert(cstate->escontext);
669 :
670 : /*
671 : * The target must be a plain, foreign, or partitioned relation, or have
672 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
673 : * allowed on views, so we only hint about them in the view case.)
674 : */
675 1604 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
676 160 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
677 122 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
678 18 : !(cstate->rel->trigdesc &&
679 12 : cstate->rel->trigdesc->trig_insert_instead_row))
680 : {
681 6 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
682 6 : ereport(ERROR,
683 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684 : errmsg("cannot copy to view \"%s\"",
685 : RelationGetRelationName(cstate->rel)),
686 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
687 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
688 0 : ereport(ERROR,
689 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
690 : errmsg("cannot copy to materialized view \"%s\"",
691 : RelationGetRelationName(cstate->rel))));
692 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
693 0 : ereport(ERROR,
694 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
695 : errmsg("cannot copy to sequence \"%s\"",
696 : RelationGetRelationName(cstate->rel))));
697 : else
698 0 : ereport(ERROR,
699 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
700 : errmsg("cannot copy to non-table relation \"%s\"",
701 : RelationGetRelationName(cstate->rel))));
702 : }
703 :
704 : /*
705 : * If the target file is new-in-transaction, we assume that checking FSM
706 : * for free space is a waste of time. This could possibly be wrong, but
707 : * it's unlikely.
708 : */
709 1598 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
710 1444 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
711 1432 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
712 88 : ti_options |= TABLE_INSERT_SKIP_FSM;
713 :
714 : /*
715 : * Optimize if new relation storage was created in this subxact or one of
716 : * its committed children and we won't see those rows later as part of an
717 : * earlier scan or command. The subxact test ensures that if this subxact
718 : * aborts then the frozen rows won't be visible after xact cleanup. Note
719 : * that the stronger test of exactly which subtransaction created it is
720 : * crucial for correctness of this optimization. The test for an earlier
721 : * scan or command tolerates false negatives. FREEZE causes other sessions
722 : * to see rows they would not see under MVCC, and a false negative merely
723 : * spreads that anomaly to the current session.
724 : */
725 1598 : if (cstate->opts.freeze)
726 : {
727 : /*
728 : * We currently disallow COPY FREEZE on partitioned tables. The
729 : * reason for this is that we've simply not yet opened the partitions
730 : * to determine if the optimization can be applied to them. We could
731 : * go and open them all here, but doing so may be quite a costly
732 : * overhead for small copies. In any case, we may just end up routing
733 : * tuples to a small number of partitions. It seems better just to
734 : * raise an ERROR for partitioned tables.
735 : */
736 66 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
737 : {
738 6 : ereport(ERROR,
739 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
740 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
741 : }
742 :
743 : /*
744 : * Tolerate one registration for the benefit of FirstXactSnapshot.
745 : * Scan-bearing queries generally create at least two registrations,
746 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
747 : * Clear CatalogSnapshot to avoid counting its registration. We'll
748 : * still detect ongoing catalog scans, each of which separately
749 : * registers the snapshot it uses.
750 : */
751 60 : InvalidateCatalogSnapshot();
752 60 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
753 0 : ereport(ERROR,
754 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
755 : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
756 :
757 120 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
758 60 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
759 18 : ereport(ERROR,
760 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
761 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
762 :
763 42 : ti_options |= TABLE_INSERT_FROZEN;
764 : }
765 :
766 : /*
767 : * We need a ResultRelInfo so we can use the regular executor's
768 : * index-entry-making machinery. (There used to be a huge amount of code
769 : * here that basically duplicated execUtils.c ...)
770 : */
771 1574 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
772 1574 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
773 1574 : ExecInitResultRelation(estate, resultRelInfo, 1);
774 :
775 : /* Verify the named relation is a valid target for INSERT */
776 1574 : CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
777 :
778 1572 : ExecOpenIndices(resultRelInfo, false);
779 :
780 : /*
781 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
782 : * foreign-table result relation(s).
783 : */
784 1572 : mtstate = makeNode(ModifyTableState);
785 1572 : mtstate->ps.plan = NULL;
786 1572 : mtstate->ps.state = estate;
787 1572 : mtstate->operation = CMD_INSERT;
788 1572 : mtstate->mt_nrels = 1;
789 1572 : mtstate->resultRelInfo = resultRelInfo;
790 1572 : mtstate->rootResultRelInfo = resultRelInfo;
791 :
792 1572 : if (resultRelInfo->ri_FdwRoutine != NULL &&
793 36 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
794 36 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
795 : resultRelInfo);
796 :
797 : /*
798 : * Also, if the named relation is a foreign table, determine if the FDW
799 : * supports batch insert and determine the batch size (a FDW may support
800 : * batching, but it may be disabled for the server/table).
801 : *
802 : * If the FDW does not support batching, we set the batch size to 1.
803 : */
804 1572 : if (resultRelInfo->ri_FdwRoutine != NULL &&
805 36 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
806 36 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
807 36 : resultRelInfo->ri_BatchSize =
808 36 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
809 : else
810 1536 : resultRelInfo->ri_BatchSize = 1;
811 :
812 : Assert(resultRelInfo->ri_BatchSize >= 1);
813 :
814 : /* Prepare to catch AFTER triggers. */
815 1572 : AfterTriggerBeginQuery();
816 :
817 : /*
818 : * If there are any triggers with transition tables on the named relation,
819 : * we need to be prepared to capture transition tuples.
820 : *
821 : * Because partition tuple routing would like to know about whether
822 : * transition capture is active, we also set it in mtstate, which is
823 : * passed to ExecFindPartition() below.
824 : */
825 1572 : cstate->transition_capture = mtstate->mt_transition_capture =
826 1572 : MakeTransitionCaptureState(cstate->rel->trigdesc,
827 1572 : RelationGetRelid(cstate->rel),
828 : CMD_INSERT);
829 :
830 : /*
831 : * If the named relation is a partitioned table, initialize state for
832 : * CopyFrom tuple routing.
833 : */
834 1572 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
835 98 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
836 :
837 1572 : if (cstate->whereClause)
838 18 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
839 : &mtstate->ps);
840 :
841 : /*
842 : * It's generally more efficient to prepare a bunch of tuples for
843 : * insertion, and insert them in one
844 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
845 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
846 : * However, there are a number of reasons why we might not be able to do
847 : * this. These are explained below.
848 : */
849 1572 : if (resultRelInfo->ri_TrigDesc != NULL &&
850 176 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
851 84 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
852 : {
853 : /*
854 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
855 : * triggers on the table. Such triggers might query the table we're
856 : * inserting into and act differently if the tuples that have already
857 : * been processed and prepared for insertion are not there.
858 : */
859 104 : insertMethod = CIM_SINGLE;
860 : }
861 1468 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
862 28 : resultRelInfo->ri_BatchSize == 1)
863 : {
864 : /*
865 : * Can't support multi-inserts to a foreign table if the FDW does not
866 : * support batching, or it's disabled for the server or foreign table.
867 : */
868 18 : insertMethod = CIM_SINGLE;
869 : }
870 1450 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
871 26 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
872 : {
873 : /*
874 : * For partitioned tables we can't support multi-inserts when there
875 : * are any statement level insert triggers. It might be possible to
876 : * allow partitioned tables with such triggers in the future, but for
877 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
878 : * statement level insert triggers are on the same relation.
879 : */
880 18 : insertMethod = CIM_SINGLE;
881 : }
882 1432 : else if (cstate->volatile_defexprs)
883 : {
884 : /*
885 : * Can't support multi-inserts if there are any volatile default
886 : * expressions in the table. Similarly to the trigger case above,
887 : * such expressions may query the table we're inserting into.
888 : *
889 : * Note: It does not matter if any partitions have any volatile
890 : * default expressions as we use the defaults from the target of the
891 : * COPY command.
892 : */
893 6 : insertMethod = CIM_SINGLE;
894 : }
895 1426 : else if (contain_volatile_functions(cstate->whereClause))
896 : {
897 : /*
898 : * Can't support multi-inserts if there are any volatile function
899 : * expressions in WHERE clause. Similarly to the trigger case above,
900 : * such expressions may query the table we're inserting into.
901 : *
902 : * Note: the whereClause was already preprocessed in DoCopy(), so it's
903 : * okay to use contain_volatile_functions() directly.
904 : */
905 0 : insertMethod = CIM_SINGLE;
906 : }
907 : else
908 : {
909 : /*
910 : * For partitioned tables, we may still be able to perform bulk
911 : * inserts. However, the possibility of this depends on which types
912 : * of triggers exist on the partition. We must disable bulk inserts
913 : * if the partition is a foreign table that can't use batching or it
914 : * has any before row insert or insert instead triggers (same as we
915 : * checked above for the parent table). Since the partition's
916 : * resultRelInfos are initialized only when we actually need to insert
917 : * the first tuple into them, we must have the intermediate insert
918 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
919 : * determine if we can use bulk-inserts for the partition being
920 : * inserted into.
921 : */
922 1426 : if (proute)
923 74 : insertMethod = CIM_MULTI_CONDITIONAL;
924 : else
925 1352 : insertMethod = CIM_MULTI;
926 :
927 1426 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
928 : estate, mycid, ti_options);
929 : }
930 :
931 : /*
932 : * If not using batch mode (which allocates slots as needed) set up a
933 : * tuple slot too. When inserting into a partitioned table, we also need
934 : * one, even if we might batch insert, to read the tuple in the root
935 : * partition's form.
936 : */
937 1572 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
938 : {
939 220 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
940 : &estate->es_tupleTable);
941 220 : bistate = GetBulkInsertState();
942 : }
943 :
944 1748 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
945 176 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
946 :
947 1748 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
948 176 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
949 :
950 : /*
951 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
952 : * should do this for COPY, since it's not really an "INSERT" statement as
953 : * such. However, executing these triggers maintains consistency with the
954 : * EACH ROW triggers that we already fire on COPY.
955 : */
956 1572 : ExecBSInsertTriggers(estate, resultRelInfo);
957 :
958 1572 : econtext = GetPerTupleExprContext(estate);
959 :
960 : /* Set up callback to identify error line number */
961 1572 : errcallback.callback = CopyFromErrorCallback;
962 1572 : errcallback.arg = (void *) cstate;
963 1572 : errcallback.previous = error_context_stack;
964 1572 : error_context_stack = &errcallback;
965 :
966 : for (;;)
967 1256074 : {
968 : TupleTableSlot *myslot;
969 : bool skip_tuple;
970 :
971 1257646 : CHECK_FOR_INTERRUPTS();
972 :
973 : /*
974 : * Reset the per-tuple exprcontext. We do this after every tuple, to
975 : * clean-up after expression evaluations etc.
976 : */
977 1257646 : ResetPerTupleExprContext(estate);
978 :
979 : /* select slot to (initially) load row into */
980 1257646 : if (insertMethod == CIM_SINGLE || proute)
981 : {
982 274872 : myslot = singleslot;
983 274872 : Assert(myslot != NULL);
984 : }
985 : else
986 : {
987 : Assert(resultRelInfo == target_resultRelInfo);
988 : Assert(insertMethod == CIM_MULTI);
989 :
990 982774 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
991 : resultRelInfo);
992 : }
993 :
994 : /*
995 : * Switch to per-tuple context before calling NextCopyFrom, which does
996 : * evaluate default expressions etc. and requires per-tuple context.
997 : */
998 1257646 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
999 :
1000 1257646 : ExecClearTuple(myslot);
1001 :
1002 : /* Directly store the values/nulls array in the slot */
1003 1257646 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1004 1392 : break;
1005 :
1006 1256114 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1007 144 : cstate->escontext->error_occurred)
1008 : {
1009 : /*
1010 : * Soft error occurred, skip this tuple and just make
1011 : * ErrorSaveContext ready for the next NextCopyFrom. Since we
1012 : * don't set details_wanted and error_data is not to be filled,
1013 : * just resetting error_occurred is enough.
1014 : */
1015 96 : cstate->escontext->error_occurred = false;
1016 :
1017 : /* Report that this tuple was skipped by the ON_ERROR clause */
1018 96 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1019 96 : cstate->num_errors);
1020 :
1021 96 : if (cstate->opts.reject_limit > 0 &&
1022 48 : cstate->num_errors > cstate->opts.reject_limit)
1023 6 : ereport(ERROR,
1024 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1025 : errmsg("skipped more than REJECT_LIMIT (%lld) rows due to data type incompatibility",
1026 : (long long) cstate->opts.reject_limit)));
1027 :
1028 : /* Repeat NextCopyFrom() until no soft error occurs */
1029 90 : continue;
1030 : }
1031 :
1032 1256018 : ExecStoreVirtualTuple(myslot);
1033 :
1034 : /*
1035 : * Constraints and where clause might reference the tableoid column,
1036 : * so (re-)initialize tts_tableOid before evaluating them.
1037 : */
1038 1256018 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1039 :
1040 : /* Triggers and stuff need to be invoked in query context. */
1041 1256018 : MemoryContextSwitchTo(oldcontext);
1042 :
1043 1256018 : if (cstate->whereClause)
1044 : {
1045 66 : econtext->ecxt_scantuple = myslot;
1046 : /* Skip items that don't match COPY's WHERE clause */
1047 66 : if (!ExecQual(cstate->qualexpr, econtext))
1048 : {
1049 : /*
1050 : * Report that this tuple was filtered out by the WHERE
1051 : * clause.
1052 : */
1053 36 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1054 : ++excluded);
1055 36 : continue;
1056 : }
1057 : }
1058 :
1059 : /* Determine the partition to insert the tuple into */
1060 1255982 : if (proute)
1061 : {
1062 : TupleConversionMap *map;
1063 :
1064 : /*
1065 : * Attempt to find a partition suitable for this tuple.
1066 : * ExecFindPartition() will raise an error if none can be found or
1067 : * if the found partition is not suitable for INSERTs.
1068 : */
1069 274390 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1070 : proute, myslot, estate);
1071 :
1072 274388 : if (prevResultRelInfo != resultRelInfo)
1073 : {
1074 : /* Determine which triggers exist on this partition */
1075 161566 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1076 54 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1077 :
1078 161566 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1079 54 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1080 :
1081 : /*
1082 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1083 : * OF triggers, or if the partition is a foreign table that
1084 : * can't use batching.
1085 : */
1086 262970 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1087 101458 : !has_before_insert_row_trig &&
1088 364404 : !has_instead_insert_row_trig &&
1089 101434 : (resultRelInfo->ri_FdwRoutine == NULL ||
1090 12 : resultRelInfo->ri_BatchSize > 1);
1091 :
1092 : /* Set the multi-insert buffer to use for this partition. */
1093 161512 : if (leafpart_use_multi_insert)
1094 : {
1095 101430 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1096 86 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1097 : resultRelInfo);
1098 : }
1099 60082 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1100 28 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1101 : {
1102 : /*
1103 : * Flush pending inserts if this partition can't use
1104 : * batching, so rows are visible to triggers etc.
1105 : */
1106 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1107 : resultRelInfo,
1108 : &processed);
1109 : }
1110 :
1111 161512 : if (bistate != NULL)
1112 161512 : ReleaseBulkInsertStatePin(bistate);
1113 161512 : prevResultRelInfo = resultRelInfo;
1114 : }
1115 :
1116 : /*
1117 : * If we're capturing transition tuples, we might need to convert
1118 : * from the partition rowtype to root rowtype. But if there are no
1119 : * BEFORE triggers on the partition that could change the tuple,
1120 : * we can just remember the original unconverted tuple to avoid a
1121 : * needless round trip conversion.
1122 : */
1123 274388 : if (cstate->transition_capture != NULL)
1124 54 : cstate->transition_capture->tcs_original_insert_tuple =
1125 54 : !has_before_insert_row_trig ? myslot : NULL;
1126 :
1127 : /*
1128 : * We might need to convert from the root rowtype to the partition
1129 : * rowtype.
1130 : */
1131 274388 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1132 274388 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1133 : {
1134 : /* non batch insert */
1135 60136 : if (map != NULL)
1136 : {
1137 : TupleTableSlot *new_slot;
1138 :
1139 110 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1140 110 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1141 : }
1142 : }
1143 : else
1144 : {
1145 : /*
1146 : * Prepare to queue up tuple for later batch insert into
1147 : * current partition.
1148 : */
1149 : TupleTableSlot *batchslot;
1150 :
1151 : /* no other path available for partitioned table */
1152 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1153 :
1154 214252 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1155 : resultRelInfo);
1156 :
1157 214252 : if (map != NULL)
1158 12200 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1159 : batchslot);
1160 : else
1161 : {
1162 : /*
1163 : * This looks more expensive than it is (Believe me, I
1164 : * optimized it away. Twice.). The input is in virtual
1165 : * form, and we'll materialize the slot below - for most
1166 : * slot types the copy performs the work materialization
1167 : * would later require anyway.
1168 : */
1169 202052 : ExecCopySlot(batchslot, myslot);
1170 202052 : myslot = batchslot;
1171 : }
1172 : }
1173 :
1174 : /* ensure that triggers etc see the right relation */
1175 274388 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1176 : }
1177 :
1178 1255980 : skip_tuple = false;
1179 :
1180 : /* BEFORE ROW INSERT Triggers */
1181 1255980 : if (has_before_insert_row_trig)
1182 : {
1183 274 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1184 16 : skip_tuple = true; /* "do nothing" */
1185 : }
1186 :
1187 1255980 : if (!skip_tuple)
1188 : {
1189 : /*
1190 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1191 : * tuple. Otherwise, proceed with inserting the tuple into the
1192 : * table or foreign table.
1193 : */
1194 1255964 : if (has_instead_insert_row_trig)
1195 : {
1196 12 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1197 : }
1198 : else
1199 : {
1200 : /* Compute stored generated columns */
1201 1255952 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1202 461858 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1203 36 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1204 : CMD_INSERT);
1205 :
1206 : /*
1207 : * If the target is a plain table, check the constraints of
1208 : * the tuple.
1209 : */
1210 1255952 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1211 1255864 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1212 461822 : ExecConstraints(resultRelInfo, myslot, estate);
1213 :
1214 : /*
1215 : * Also check the tuple against the partition constraint, if
1216 : * there is one; except that if we got here via tuple-routing,
1217 : * we don't need to if there's no BR trigger defined on the
1218 : * partition.
1219 : */
1220 1255922 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1221 274376 : (proute == NULL || has_before_insert_row_trig))
1222 2308 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1223 :
1224 : /* Store the slot in the multi-insert buffer, when enabled. */
1225 1255922 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1226 : {
1227 : /*
1228 : * The slot previously might point into the per-tuple
1229 : * context. For batching it needs to be longer lived.
1230 : */
1231 1195578 : ExecMaterializeSlot(myslot);
1232 :
1233 : /* Add this tuple to the tuple buffer */
1234 1195578 : CopyMultiInsertInfoStore(&multiInsertInfo,
1235 : resultRelInfo, myslot,
1236 : cstate->line_buf.len,
1237 : cstate->cur_lineno);
1238 :
1239 : /*
1240 : * If enough inserts have queued up, then flush all
1241 : * buffers out to their tables.
1242 : */
1243 1195578 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1244 1100 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1245 : resultRelInfo,
1246 : &processed);
1247 :
1248 : /*
1249 : * We delay updating the row counter and progress of the
1250 : * COPY command until after writing the tuples stored in
1251 : * the buffer out to the table, as in single insert mode.
1252 : * See CopyMultiInsertBufferFlush().
1253 : */
1254 1195578 : continue; /* next tuple please */
1255 : }
1256 : else
1257 : {
1258 60344 : List *recheckIndexes = NIL;
1259 :
1260 : /* OK, store the tuple */
1261 60344 : if (resultRelInfo->ri_FdwRoutine != NULL)
1262 : {
1263 50 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1264 : resultRelInfo,
1265 : myslot,
1266 : NULL);
1267 :
1268 48 : if (myslot == NULL) /* "do nothing" */
1269 4 : continue; /* next tuple please */
1270 :
1271 : /*
1272 : * AFTER ROW Triggers might reference the tableoid
1273 : * column, so (re-)initialize tts_tableOid before
1274 : * evaluating them.
1275 : */
1276 44 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1277 : }
1278 : else
1279 : {
1280 : /* OK, store the tuple and create index entries for it */
1281 60294 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1282 : myslot, mycid, ti_options, bistate);
1283 :
1284 60294 : if (resultRelInfo->ri_NumIndices > 0)
1285 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1286 : myslot,
1287 : estate,
1288 : false,
1289 : false,
1290 : NULL,
1291 : NIL,
1292 : false);
1293 : }
1294 :
1295 : /* AFTER ROW INSERT Triggers */
1296 60338 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1297 : recheckIndexes, cstate->transition_capture);
1298 :
1299 60338 : list_free(recheckIndexes);
1300 : }
1301 : }
1302 :
1303 : /*
1304 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1305 : * or FDW; this is the same definition used by nodeModifyTable.c
1306 : * for counting tuples inserted by an INSERT command. Update
1307 : * progress of the COPY command as well.
1308 : */
1309 60350 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1310 : ++processed);
1311 : }
1312 : }
1313 :
1314 : /* Flush any remaining buffered tuples */
1315 1392 : if (insertMethod != CIM_SINGLE)
1316 : {
1317 1272 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1318 1042 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1319 : }
1320 :
1321 : /* Done, clean up */
1322 1378 : error_context_stack = errcallback.previous;
1323 :
1324 1378 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1325 24 : cstate->num_errors > 0 &&
1326 24 : cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1327 18 : ereport(NOTICE,
1328 : errmsg_plural("%llu row was skipped due to data type incompatibility",
1329 : "%llu rows were skipped due to data type incompatibility",
1330 : (unsigned long long) cstate->num_errors,
1331 : (unsigned long long) cstate->num_errors));
1332 :
1333 1378 : if (bistate != NULL)
1334 192 : FreeBulkInsertState(bistate);
1335 :
1336 1378 : MemoryContextSwitchTo(oldcontext);
1337 :
1338 : /* Execute AFTER STATEMENT insertion triggers */
1339 1378 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1340 :
1341 : /* Handle queued AFTER triggers */
1342 1378 : AfterTriggerEndQuery(estate);
1343 :
1344 1378 : ExecResetTupleTable(estate->es_tupleTable, false);
1345 :
1346 : /* Allow the FDW to shut down */
1347 1378 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1348 32 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1349 32 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1350 : target_resultRelInfo);
1351 :
1352 : /* Tear down the multi-insert buffer data */
1353 1378 : if (insertMethod != CIM_SINGLE)
1354 1258 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1355 :
1356 : /* Close all the partitioned tables, leaf partitions, and their indices */
1357 1378 : if (proute)
1358 96 : ExecCleanupTupleRouting(mtstate, proute);
1359 :
1360 : /* Close the result relations, including any trigger target relations */
1361 1378 : ExecCloseResultRelations(estate);
1362 1378 : ExecCloseRangeTableRelations(estate);
1363 :
1364 1378 : FreeExecutorState(estate);
1365 :
1366 1378 : return processed;
1367 : }
1368 :
1369 : /*
1370 : * Setup to read tuples from a file for COPY FROM.
1371 : *
1372 : * 'rel': Used as a template for the tuples
1373 : * 'whereClause': WHERE clause from the COPY FROM command
1374 : * 'filename': Name of server-local file to read, NULL for STDIN
1375 : * 'is_program': true if 'filename' is program to execute
1376 : * 'data_source_cb': callback that provides the input data
1377 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1378 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1379 : *
1380 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1381 : */
1382 : CopyFromState
1383 1930 : BeginCopyFrom(ParseState *pstate,
1384 : Relation rel,
1385 : Node *whereClause,
1386 : const char *filename,
1387 : bool is_program,
1388 : copy_data_source_cb data_source_cb,
1389 : List *attnamelist,
1390 : List *options)
1391 : {
1392 : CopyFromState cstate;
1393 1930 : bool pipe = (filename == NULL);
1394 : TupleDesc tupDesc;
1395 : AttrNumber num_phys_attrs,
1396 : num_defaults;
1397 : FmgrInfo *in_functions;
1398 : Oid *typioparams;
1399 : Oid in_func_oid;
1400 : int *defmap;
1401 : ExprState **defexprs;
1402 : MemoryContext oldcontext;
1403 : bool volatile_defexprs;
1404 1930 : const int progress_cols[] = {
1405 : PROGRESS_COPY_COMMAND,
1406 : PROGRESS_COPY_TYPE,
1407 : PROGRESS_COPY_BYTES_TOTAL
1408 : };
1409 1930 : int64 progress_vals[] = {
1410 : PROGRESS_COPY_COMMAND_FROM,
1411 : 0,
1412 : 0
1413 : };
1414 :
1415 : /* Allocate workspace and zero all fields */
1416 1930 : cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1417 :
1418 : /*
1419 : * We allocate everything used by a cstate in a new memory context. This
1420 : * avoids memory leaks during repeated use of COPY in a query.
1421 : */
1422 1930 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1423 : "COPY",
1424 : ALLOCSET_DEFAULT_SIZES);
1425 :
1426 1930 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1427 :
1428 : /* Extract options from the statement node tree */
1429 1930 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1430 :
1431 : /* Process the target relation */
1432 1700 : cstate->rel = rel;
1433 :
1434 1700 : tupDesc = RelationGetDescr(cstate->rel);
1435 :
1436 : /* process common options or initialization */
1437 :
1438 : /* Generate or convert list of attributes to process */
1439 1700 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1440 :
1441 1700 : num_phys_attrs = tupDesc->natts;
1442 :
1443 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1444 1700 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1445 1700 : if (cstate->opts.force_notnull_all)
1446 12 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1447 1688 : else if (cstate->opts.force_notnull)
1448 : {
1449 : List *attnums;
1450 : ListCell *cur;
1451 :
1452 26 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1453 :
1454 52 : foreach(cur, attnums)
1455 : {
1456 32 : int attnum = lfirst_int(cur);
1457 32 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1458 :
1459 32 : if (!list_member_int(cstate->attnumlist, attnum))
1460 6 : ereport(ERROR,
1461 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1462 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1463 : errmsg("%s column \"%s\" not referenced by COPY",
1464 : "FORCE_NOT_NULL", NameStr(attr->attname))));
1465 26 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1466 : }
1467 : }
1468 :
1469 : /* Set up soft error handler for ON_ERROR */
1470 1694 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1471 : {
1472 58 : cstate->escontext = makeNode(ErrorSaveContext);
1473 58 : cstate->escontext->type = T_ErrorSaveContext;
1474 58 : cstate->escontext->error_occurred = false;
1475 :
1476 : /*
1477 : * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1478 : * options later
1479 : */
1480 58 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1481 58 : cstate->escontext->details_wanted = false;
1482 : }
1483 : else
1484 1636 : cstate->escontext = NULL;
1485 :
1486 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1487 1694 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1488 1694 : if (cstate->opts.force_null_all)
1489 12 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1490 1682 : else if (cstate->opts.force_null)
1491 : {
1492 : List *attnums;
1493 : ListCell *cur;
1494 :
1495 26 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1496 :
1497 52 : foreach(cur, attnums)
1498 : {
1499 32 : int attnum = lfirst_int(cur);
1500 32 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1501 :
1502 32 : if (!list_member_int(cstate->attnumlist, attnum))
1503 6 : ereport(ERROR,
1504 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1505 : /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1506 : errmsg("%s column \"%s\" not referenced by COPY",
1507 : "FORCE_NULL", NameStr(attr->attname))));
1508 26 : cstate->opts.force_null_flags[attnum - 1] = true;
1509 : }
1510 : }
1511 :
1512 : /* Convert convert_selectively name list to per-column flags */
1513 1688 : if (cstate->opts.convert_selectively)
1514 : {
1515 : List *attnums;
1516 : ListCell *cur;
1517 :
1518 4 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1519 :
1520 4 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1521 :
1522 8 : foreach(cur, attnums)
1523 : {
1524 4 : int attnum = lfirst_int(cur);
1525 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1526 :
1527 4 : if (!list_member_int(cstate->attnumlist, attnum))
1528 0 : ereport(ERROR,
1529 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1530 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1531 : NameStr(attr->attname))));
1532 4 : cstate->convert_select_flags[attnum - 1] = true;
1533 : }
1534 : }
1535 :
1536 : /* Use client encoding when ENCODING option is not specified. */
1537 1688 : if (cstate->opts.file_encoding < 0)
1538 1682 : cstate->file_encoding = pg_get_client_encoding();
1539 : else
1540 6 : cstate->file_encoding = cstate->opts.file_encoding;
1541 :
1542 : /*
1543 : * Look up encoding conversion function.
1544 : */
1545 1688 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1546 6 : cstate->file_encoding == PG_SQL_ASCII ||
1547 0 : GetDatabaseEncoding() == PG_SQL_ASCII)
1548 : {
1549 1688 : cstate->need_transcoding = false;
1550 : }
1551 : else
1552 : {
1553 0 : cstate->need_transcoding = true;
1554 0 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1555 : GetDatabaseEncoding());
1556 0 : if (!OidIsValid(cstate->conversion_proc))
1557 0 : ereport(ERROR,
1558 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1559 : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1560 : pg_encoding_to_char(cstate->file_encoding),
1561 : pg_encoding_to_char(GetDatabaseEncoding()))));
1562 : }
1563 :
1564 1688 : cstate->copy_src = COPY_FILE; /* default */
1565 :
1566 1688 : cstate->whereClause = whereClause;
1567 :
1568 : /* Initialize state variables */
1569 1688 : cstate->eol_type = EOL_UNKNOWN;
1570 1688 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1571 1688 : cstate->cur_lineno = 0;
1572 1688 : cstate->cur_attname = NULL;
1573 1688 : cstate->cur_attval = NULL;
1574 1688 : cstate->relname_only = false;
1575 :
1576 : /*
1577 : * Allocate buffers for the input pipeline.
1578 : *
1579 : * attribute_buf and raw_buf are used in both text and binary modes, but
1580 : * input_buf and line_buf only in text mode.
1581 : */
1582 1688 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1583 1688 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1584 1688 : cstate->raw_reached_eof = false;
1585 :
1586 1688 : if (!cstate->opts.binary)
1587 : {
1588 : /*
1589 : * If encoding conversion is needed, we need another buffer to hold
1590 : * the converted input data. Otherwise, we can just point input_buf
1591 : * to the same buffer as raw_buf.
1592 : */
1593 1672 : if (cstate->need_transcoding)
1594 : {
1595 0 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1596 0 : cstate->input_buf_index = cstate->input_buf_len = 0;
1597 : }
1598 : else
1599 1672 : cstate->input_buf = cstate->raw_buf;
1600 1672 : cstate->input_reached_eof = false;
1601 :
1602 1672 : initStringInfo(&cstate->line_buf);
1603 : }
1604 :
1605 1688 : initStringInfo(&cstate->attribute_buf);
1606 :
1607 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1608 1688 : if (pstate)
1609 : {
1610 1618 : cstate->range_table = pstate->p_rtable;
1611 1618 : cstate->rteperminfos = pstate->p_rteperminfos;
1612 : }
1613 :
1614 1688 : num_defaults = 0;
1615 1688 : volatile_defexprs = false;
1616 :
1617 : /*
1618 : * Pick up the required catalog information for each attribute in the
1619 : * relation, including the input function, the element type (to pass to
1620 : * the input function), and info about defaults and constraints. (Which
1621 : * input function we use depends on text/binary format choice.)
1622 : */
1623 1688 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1624 1688 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1625 1688 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1626 1688 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1627 :
1628 6602 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1629 : {
1630 4928 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1631 :
1632 : /* We don't need info for dropped attributes */
1633 4928 : if (att->attisdropped)
1634 124 : continue;
1635 :
1636 : /* Fetch the input function and typioparam info */
1637 4804 : if (cstate->opts.binary)
1638 62 : getTypeBinaryInputInfo(att->atttypid,
1639 62 : &in_func_oid, &typioparams[attnum - 1]);
1640 : else
1641 4742 : getTypeInputInfo(att->atttypid,
1642 4742 : &in_func_oid, &typioparams[attnum - 1]);
1643 4802 : fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1644 :
1645 : /* Get default info if available */
1646 4802 : defexprs[attnum - 1] = NULL;
1647 :
1648 : /*
1649 : * We only need the default values for columns that do not appear in
1650 : * the column list, unless the DEFAULT option was given. We never need
1651 : * default values for generated columns.
1652 : */
1653 4802 : if ((cstate->opts.default_print != NULL ||
1654 4676 : !list_member_int(cstate->attnumlist, attnum)) &&
1655 654 : !att->attgenerated)
1656 : {
1657 634 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1658 : attnum);
1659 :
1660 634 : if (defexpr != NULL)
1661 : {
1662 : /* Run the expression through planner */
1663 264 : defexpr = expression_planner(defexpr);
1664 :
1665 : /* Initialize executable expression in copycontext */
1666 252 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1667 :
1668 : /* if NOT copied from input */
1669 : /* use default value if one exists */
1670 252 : if (!list_member_int(cstate->attnumlist, attnum))
1671 : {
1672 172 : defmap[num_defaults] = attnum - 1;
1673 172 : num_defaults++;
1674 : }
1675 :
1676 : /*
1677 : * If a default expression looks at the table being loaded,
1678 : * then it could give the wrong answer when using
1679 : * multi-insert. Since database access can be dynamic this is
1680 : * hard to test for exactly, so we use the much wider test of
1681 : * whether the default expression is volatile. We allow for
1682 : * the special case of when the default expression is the
1683 : * nextval() of a sequence which in this specific case is
1684 : * known to be safe for use with the multi-insert
1685 : * optimization. Hence we use this special case function
1686 : * checker rather than the standard check for
1687 : * contain_volatile_functions(). Note also that we already
1688 : * ran the expression through expression_planner().
1689 : */
1690 252 : if (!volatile_defexprs)
1691 252 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1692 : }
1693 : }
1694 : }
1695 :
1696 1674 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1697 :
1698 : /* initialize progress */
1699 1674 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1700 1674 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1701 1674 : cstate->bytes_processed = 0;
1702 :
1703 : /* We keep those variables in cstate. */
1704 1674 : cstate->in_functions = in_functions;
1705 1674 : cstate->typioparams = typioparams;
1706 1674 : cstate->defmap = defmap;
1707 1674 : cstate->defexprs = defexprs;
1708 1674 : cstate->volatile_defexprs = volatile_defexprs;
1709 1674 : cstate->num_defaults = num_defaults;
1710 1674 : cstate->is_program = is_program;
1711 :
1712 1674 : if (data_source_cb)
1713 : {
1714 340 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1715 340 : cstate->copy_src = COPY_CALLBACK;
1716 340 : cstate->data_source_cb = data_source_cb;
1717 : }
1718 1334 : else if (pipe)
1719 : {
1720 920 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1721 : Assert(!is_program); /* the grammar does not allow this */
1722 920 : if (whereToSendOutput == DestRemote)
1723 920 : ReceiveCopyBegin(cstate);
1724 : else
1725 0 : cstate->copy_file = stdin;
1726 : }
1727 : else
1728 : {
1729 414 : cstate->filename = pstrdup(filename);
1730 :
1731 414 : if (cstate->is_program)
1732 : {
1733 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1734 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1735 0 : if (cstate->copy_file == NULL)
1736 0 : ereport(ERROR,
1737 : (errcode_for_file_access(),
1738 : errmsg("could not execute command \"%s\": %m",
1739 : cstate->filename)));
1740 : }
1741 : else
1742 : {
1743 : struct stat st;
1744 :
1745 414 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1746 414 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1747 414 : if (cstate->copy_file == NULL)
1748 : {
1749 : /* copy errno because ereport subfunctions might change it */
1750 0 : int save_errno = errno;
1751 :
1752 0 : ereport(ERROR,
1753 : (errcode_for_file_access(),
1754 : errmsg("could not open file \"%s\" for reading: %m",
1755 : cstate->filename),
1756 : (save_errno == ENOENT || save_errno == EACCES) ?
1757 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1758 : "You may want a client-side facility such as psql's \\copy.") : 0));
1759 : }
1760 :
1761 414 : if (fstat(fileno(cstate->copy_file), &st))
1762 0 : ereport(ERROR,
1763 : (errcode_for_file_access(),
1764 : errmsg("could not stat file \"%s\": %m",
1765 : cstate->filename)));
1766 :
1767 414 : if (S_ISDIR(st.st_mode))
1768 0 : ereport(ERROR,
1769 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1770 : errmsg("\"%s\" is a directory", cstate->filename)));
1771 :
1772 414 : progress_vals[2] = st.st_size;
1773 : }
1774 : }
1775 :
1776 1674 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1777 :
1778 1674 : if (cstate->opts.binary)
1779 : {
1780 : /* Read and verify binary header */
1781 14 : ReceiveCopyBinaryHeader(cstate);
1782 : }
1783 :
1784 : /* create workspace for CopyReadAttributes results */
1785 1674 : if (!cstate->opts.binary)
1786 : {
1787 1660 : AttrNumber attr_count = list_length(cstate->attnumlist);
1788 :
1789 1660 : cstate->max_fields = attr_count;
1790 1660 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1791 : }
1792 :
1793 1674 : MemoryContextSwitchTo(oldcontext);
1794 :
1795 1674 : return cstate;
1796 : }
1797 :
1798 : /*
1799 : * Clean up storage and release resources for COPY FROM.
1800 : */
1801 : void
1802 1116 : EndCopyFrom(CopyFromState cstate)
1803 : {
1804 : /* No COPY FROM related resources except memory. */
1805 1116 : if (cstate->is_program)
1806 : {
1807 0 : ClosePipeFromProgram(cstate);
1808 : }
1809 : else
1810 : {
1811 1116 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1812 0 : ereport(ERROR,
1813 : (errcode_for_file_access(),
1814 : errmsg("could not close file \"%s\": %m",
1815 : cstate->filename)));
1816 : }
1817 :
1818 1116 : pgstat_progress_end_command();
1819 :
1820 1116 : MemoryContextDelete(cstate->copycontext);
1821 1116 : pfree(cstate);
1822 1116 : }
1823 :
1824 : /*
1825 : * Closes the pipe from an external program, checking the pclose() return code.
1826 : */
1827 : static void
1828 0 : ClosePipeFromProgram(CopyFromState cstate)
1829 : {
1830 : int pclose_rc;
1831 :
1832 : Assert(cstate->is_program);
1833 :
1834 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1835 0 : if (pclose_rc == -1)
1836 0 : ereport(ERROR,
1837 : (errcode_for_file_access(),
1838 : errmsg("could not close pipe to external command: %m")));
1839 0 : else if (pclose_rc != 0)
1840 : {
1841 : /*
1842 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1843 : * expectable for the called program to fail with SIGPIPE, and we
1844 : * should not report that as an error. Otherwise, SIGPIPE indicates a
1845 : * problem.
1846 : */
1847 0 : if (!cstate->raw_reached_eof &&
1848 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1849 0 : return;
1850 :
1851 0 : ereport(ERROR,
1852 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1853 : errmsg("program \"%s\" failed",
1854 : cstate->filename),
1855 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1856 : }
1857 : }
|