Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/tableam.h"
29 : #include "access/xact.h"
30 : #include "catalog/namespace.h"
31 : #include "commands/copy.h"
32 : #include "commands/copyfrom_internal.h"
33 : #include "commands/progress.h"
34 : #include "commands/trigger.h"
35 : #include "executor/execPartition.h"
36 : #include "executor/executor.h"
37 : #include "executor/nodeModifyTable.h"
38 : #include "executor/tuptable.h"
39 : #include "foreign/fdwapi.h"
40 : #include "mb/pg_wchar.h"
41 : #include "miscadmin.h"
42 : #include "nodes/miscnodes.h"
43 : #include "optimizer/optimizer.h"
44 : #include "pgstat.h"
45 : #include "rewrite/rewriteHandler.h"
46 : #include "storage/fd.h"
47 : #include "tcop/tcopprot.h"
48 : #include "utils/lsyscache.h"
49 : #include "utils/memutils.h"
50 : #include "utils/portal.h"
51 : #include "utils/rel.h"
52 : #include "utils/snapmgr.h"
53 :
54 : /*
55 : * No more than this many tuples per CopyMultiInsertBuffer
56 : *
57 : * Caution: Don't make this too big, as we could end up with this many
58 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
59 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
60 : * memory requirements during copies into partitioned tables with a large
61 : * number of partitions.
62 : */
63 : #define MAX_BUFFERED_TUPLES 1000
64 :
65 : /*
66 : * Flush buffers if there are >= this many bytes, as counted by the input
67 : * size, of tuples stored.
68 : */
69 : #define MAX_BUFFERED_BYTES 65535
70 :
71 : /* Trim the list of buffers back down to this number after flushing */
72 : #define MAX_PARTITION_BUFFERS 32
73 :
74 : /* Stores multi-insert data related to a single relation in CopyFrom. */
75 : typedef struct CopyMultiInsertBuffer
76 : {
77 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
78 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
79 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
80 : * table; NULL if foreign table */
81 : int nused; /* number of 'slots' containing tuples */
82 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
83 : * stream */
84 : } CopyMultiInsertBuffer;
85 :
86 : /*
87 : * Stores one or many CopyMultiInsertBuffers and details about the size and
88 : * number of tuples which are stored in them. This allows multiple buffers to
89 : * exist at once when COPYing into a partitioned table.
90 : */
91 : typedef struct CopyMultiInsertInfo
92 : {
93 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
94 : int bufferedTuples; /* number of tuples buffered over all buffers */
95 : int bufferedBytes; /* number of bytes from all buffered tuples */
96 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
97 : EState *estate; /* Executor state used for COPY */
98 : CommandId mycid; /* Command Id used for COPY */
99 : int ti_options; /* table insert options */
100 : } CopyMultiInsertInfo;
101 :
102 :
103 : /* non-export function prototypes */
104 : static void ClosePipeFromProgram(CopyFromState cstate);
105 :
106 : /*
107 : * error context callback for COPY FROM
108 : *
109 : * The argument for the error context must be CopyFromState.
110 : */
111 : void
112 280 : CopyFromErrorCallback(void *arg)
113 : {
114 280 : CopyFromState cstate = (CopyFromState) arg;
115 :
116 280 : if (cstate->relname_only)
117 : {
118 44 : errcontext("COPY %s",
119 : cstate->cur_relname);
120 44 : return;
121 : }
122 236 : if (cstate->opts.binary)
123 : {
124 : /* can't usefully display the data */
125 2 : if (cstate->cur_attname)
126 2 : errcontext("COPY %s, line %llu, column %s",
127 : cstate->cur_relname,
128 2 : (unsigned long long) cstate->cur_lineno,
129 : cstate->cur_attname);
130 : else
131 0 : errcontext("COPY %s, line %llu",
132 : cstate->cur_relname,
133 0 : (unsigned long long) cstate->cur_lineno);
134 : }
135 : else
136 : {
137 234 : if (cstate->cur_attname && cstate->cur_attval)
138 32 : {
139 : /* error is relevant to a particular column */
140 : char *attval;
141 :
142 32 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
143 32 : errcontext("COPY %s, line %llu, column %s: \"%s\"",
144 : cstate->cur_relname,
145 32 : (unsigned long long) cstate->cur_lineno,
146 : cstate->cur_attname,
147 : attval);
148 32 : pfree(attval);
149 : }
150 202 : else if (cstate->cur_attname)
151 : {
152 : /* error is relevant to a particular column, value is NULL */
153 6 : errcontext("COPY %s, line %llu, column %s: null input",
154 : cstate->cur_relname,
155 6 : (unsigned long long) cstate->cur_lineno,
156 : cstate->cur_attname);
157 : }
158 : else
159 : {
160 : /*
161 : * Error is relevant to a particular line.
162 : *
163 : * If line_buf still contains the correct line, print it.
164 : */
165 196 : if (cstate->line_buf_valid)
166 : {
167 : char *lineval;
168 :
169 184 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
170 184 : errcontext("COPY %s, line %llu: \"%s\"",
171 : cstate->cur_relname,
172 184 : (unsigned long long) cstate->cur_lineno, lineval);
173 184 : pfree(lineval);
174 : }
175 : else
176 : {
177 12 : errcontext("COPY %s, line %llu",
178 : cstate->cur_relname,
179 12 : (unsigned long long) cstate->cur_lineno);
180 : }
181 : }
182 : }
183 : }
184 :
185 : /*
186 : * Make sure we don't print an unreasonable amount of COPY data in a message.
187 : *
188 : * Returns a pstrdup'd copy of the input.
189 : */
190 : char *
191 252 : CopyLimitPrintoutLength(const char *str)
192 : {
193 : #define MAX_COPY_DATA_DISPLAY 100
194 :
195 252 : int slen = strlen(str);
196 : int len;
197 : char *res;
198 :
199 : /* Fast path if definitely okay */
200 252 : if (slen <= MAX_COPY_DATA_DISPLAY)
201 252 : return pstrdup(str);
202 :
203 : /* Apply encoding-dependent truncation */
204 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
205 :
206 : /*
207 : * Truncate, and add "..." to show we truncated the input.
208 : */
209 0 : res = (char *) palloc(len + 4);
210 0 : memcpy(res, str, len);
211 0 : strcpy(res + len, "...");
212 :
213 0 : return res;
214 : }
215 :
216 : /*
217 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
218 : * ResultRelInfo.
219 : */
220 : static CopyMultiInsertBuffer *
221 1386 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
222 : {
223 : CopyMultiInsertBuffer *buffer;
224 :
225 1386 : buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
226 1386 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
227 1386 : buffer->resultRelInfo = rri;
228 1386 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
229 1386 : buffer->nused = 0;
230 :
231 1386 : return buffer;
232 : }
233 :
234 : /*
235 : * Make a new buffer for this ResultRelInfo.
236 : */
237 : static inline void
238 1386 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
239 : ResultRelInfo *rri)
240 : {
241 : CopyMultiInsertBuffer *buffer;
242 :
243 1386 : buffer = CopyMultiInsertBufferInit(rri);
244 :
245 : /* Setup back-link so we can easily find this buffer again */
246 1386 : rri->ri_CopyMultiInsertBuffer = buffer;
247 : /* Record that we're tracking this buffer */
248 1386 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
249 1386 : }
250 :
251 : /*
252 : * Initialize an already allocated CopyMultiInsertInfo.
253 : *
254 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
255 : * for that table.
256 : */
257 : static void
258 1374 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
259 : CopyFromState cstate, EState *estate, CommandId mycid,
260 : int ti_options)
261 : {
262 1374 : miinfo->multiInsertBuffers = NIL;
263 1374 : miinfo->bufferedTuples = 0;
264 1374 : miinfo->bufferedBytes = 0;
265 1374 : miinfo->cstate = cstate;
266 1374 : miinfo->estate = estate;
267 1374 : miinfo->mycid = mycid;
268 1374 : miinfo->ti_options = ti_options;
269 :
270 : /*
271 : * Only setup the buffer when not dealing with a partitioned table.
272 : * Buffers for partitioned tables will just be setup when we need to send
273 : * tuples their way for the first time.
274 : */
275 1374 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
276 1300 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
277 1374 : }
278 :
279 : /*
280 : * Returns true if the buffers are full
281 : */
282 : static inline bool
283 1187944 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
284 : {
285 1187944 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
286 1186950 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
287 1100 : return true;
288 1186844 : return false;
289 : }
290 :
291 : /*
292 : * Returns true if we have no buffered tuples
293 : */
294 : static inline bool
295 1266 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
296 : {
297 1266 : return miinfo->bufferedTuples == 0;
298 : }
299 :
300 : /*
301 : * Write the tuples stored in 'buffer' out to the table.
302 : */
303 : static inline void
304 2334 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
305 : CopyMultiInsertBuffer *buffer,
306 : int64 *processed)
307 : {
308 2334 : CopyFromState cstate = miinfo->cstate;
309 2334 : EState *estate = miinfo->estate;
310 2334 : int nused = buffer->nused;
311 2334 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
312 2334 : TupleTableSlot **slots = buffer->slots;
313 : int i;
314 :
315 2334 : if (resultRelInfo->ri_FdwRoutine)
316 : {
317 14 : int batch_size = resultRelInfo->ri_BatchSize;
318 14 : int sent = 0;
319 :
320 : Assert(buffer->bistate == NULL);
321 :
322 : /* Ensure that the FDW supports batching and it's enabled */
323 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
324 : Assert(batch_size > 1);
325 :
326 : /*
327 : * We suppress error context information other than the relation name,
328 : * if one of the operations below fails.
329 : */
330 : Assert(!cstate->relname_only);
331 14 : cstate->relname_only = true;
332 :
333 38 : while (sent < nused)
334 : {
335 26 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
336 26 : int inserted = size;
337 : TupleTableSlot **rslots;
338 :
339 : /* insert into foreign table: let the FDW do it */
340 : rslots =
341 26 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
342 : resultRelInfo,
343 26 : &slots[sent],
344 : NULL,
345 : &inserted);
346 :
347 24 : sent += size;
348 :
349 : /* No need to do anything if there are no inserted rows */
350 24 : if (inserted <= 0)
351 4 : continue;
352 :
353 : /* Triggers on foreign tables should not have transition tables */
354 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
355 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
356 :
357 : /* Run AFTER ROW INSERT triggers */
358 20 : if (resultRelInfo->ri_TrigDesc != NULL &&
359 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
360 : {
361 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
362 :
363 0 : for (i = 0; i < inserted; i++)
364 : {
365 0 : TupleTableSlot *slot = rslots[i];
366 :
367 : /*
368 : * AFTER ROW Triggers might reference the tableoid column,
369 : * so (re-)initialize tts_tableOid before evaluating them.
370 : */
371 0 : slot->tts_tableOid = relid;
372 :
373 0 : ExecARInsertTriggers(estate, resultRelInfo,
374 : slot, NIL,
375 : cstate->transition_capture);
376 : }
377 : }
378 :
379 : /* Update the row counter and progress of the COPY command */
380 20 : *processed += inserted;
381 20 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
382 : *processed);
383 : }
384 :
385 48 : for (i = 0; i < nused; i++)
386 36 : ExecClearTuple(slots[i]);
387 :
388 : /* reset relname_only */
389 12 : cstate->relname_only = false;
390 : }
391 : else
392 : {
393 2320 : CommandId mycid = miinfo->mycid;
394 2320 : int ti_options = miinfo->ti_options;
395 2320 : bool line_buf_valid = cstate->line_buf_valid;
396 2320 : uint64 save_cur_lineno = cstate->cur_lineno;
397 : MemoryContext oldcontext;
398 :
399 : Assert(buffer->bistate != NULL);
400 :
401 : /*
402 : * Print error context information correctly, if one of the operations
403 : * below fails.
404 : */
405 2320 : cstate->line_buf_valid = false;
406 :
407 : /*
408 : * table_multi_insert may leak memory, so switch to short-lived memory
409 : * context before calling it.
410 : */
411 2320 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
412 2320 : table_multi_insert(resultRelInfo->ri_RelationDesc,
413 : slots,
414 : nused,
415 : mycid,
416 : ti_options,
417 : buffer->bistate);
418 2320 : MemoryContextSwitchTo(oldcontext);
419 :
420 1190112 : for (i = 0; i < nused; i++)
421 : {
422 : /*
423 : * If there are any indexes, update them for all the inserted
424 : * tuples, and run AFTER ROW INSERT triggers.
425 : */
426 1187804 : if (resultRelInfo->ri_NumIndices > 0)
427 : {
428 : List *recheckIndexes;
429 :
430 201834 : cstate->cur_lineno = buffer->linenos[i];
431 : recheckIndexes =
432 201834 : ExecInsertIndexTuples(resultRelInfo,
433 : buffer->slots[i], estate, false,
434 : false, NULL, NIL, false);
435 201822 : ExecARInsertTriggers(estate, resultRelInfo,
436 201822 : slots[i], recheckIndexes,
437 : cstate->transition_capture);
438 201822 : list_free(recheckIndexes);
439 : }
440 :
441 : /*
442 : * There's no indexes, but see if we need to run AFTER ROW INSERT
443 : * triggers anyway.
444 : */
445 985970 : else if (resultRelInfo->ri_TrigDesc != NULL &&
446 78 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
447 60 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
448 : {
449 36 : cstate->cur_lineno = buffer->linenos[i];
450 36 : ExecARInsertTriggers(estate, resultRelInfo,
451 36 : slots[i], NIL,
452 : cstate->transition_capture);
453 : }
454 :
455 1187792 : ExecClearTuple(slots[i]);
456 : }
457 :
458 : /* Update the row counter and progress of the COPY command */
459 2308 : *processed += nused;
460 2308 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
461 : *processed);
462 :
463 : /* reset cur_lineno and line_buf_valid to what they were */
464 2308 : cstate->line_buf_valid = line_buf_valid;
465 2308 : cstate->cur_lineno = save_cur_lineno;
466 : }
467 :
468 : /* Mark that all slots are free */
469 2320 : buffer->nused = 0;
470 2320 : }
471 :
472 : /*
473 : * Drop used slots and free member for this buffer.
474 : *
475 : * The buffer must be flushed before cleanup.
476 : */
477 : static inline void
478 1236 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
479 : CopyMultiInsertBuffer *buffer)
480 : {
481 1236 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
482 : int i;
483 :
484 : /* Ensure buffer was flushed */
485 : Assert(buffer->nused == 0);
486 :
487 : /* Remove back-link to ourself */
488 1236 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
489 :
490 1236 : if (resultRelInfo->ri_FdwRoutine == NULL)
491 : {
492 : Assert(buffer->bistate != NULL);
493 1224 : FreeBulkInsertState(buffer->bistate);
494 : }
495 : else
496 : Assert(buffer->bistate == NULL);
497 :
498 : /* Since we only create slots on demand, just drop the non-null ones. */
499 227932 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
500 226696 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
501 :
502 1236 : if (resultRelInfo->ri_FdwRoutine == NULL)
503 1224 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
504 : miinfo->ti_options);
505 :
506 1236 : pfree(buffer);
507 1236 : }
508 :
509 : /*
510 : * Write out all stored tuples in all buffers out to the tables.
511 : *
512 : * Once flushed we also trim the tracked buffers list down to size by removing
513 : * the buffers created earliest first.
514 : *
515 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
516 : * used. When cleaning up old buffers we'll never remove the one for
517 : * 'curr_rri'.
518 : */
519 : static inline void
520 2108 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
521 : int64 *processed)
522 : {
523 : ListCell *lc;
524 :
525 4428 : foreach(lc, miinfo->multiInsertBuffers)
526 : {
527 2334 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
528 :
529 2334 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
530 : }
531 :
532 2094 : miinfo->bufferedTuples = 0;
533 2094 : miinfo->bufferedBytes = 0;
534 :
535 : /*
536 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
537 : * remove buffers starting with the ones we created first. It seems less
538 : * likely that these older ones will be needed than the ones that were
539 : * just created.
540 : */
541 2094 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
542 : {
543 : CopyMultiInsertBuffer *buffer;
544 :
545 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
546 :
547 : /*
548 : * We never want to remove the buffer that's currently being used, so
549 : * if we happen to find that then move it to the end of the list.
550 : */
551 0 : if (buffer->resultRelInfo == curr_rri)
552 : {
553 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
554 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
555 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
556 : }
557 :
558 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
559 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
560 : }
561 2094 : }
562 :
563 : /*
564 : * Cleanup allocated buffers and free memory
565 : */
566 : static inline void
567 1224 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
568 : {
569 : ListCell *lc;
570 :
571 2460 : foreach(lc, miinfo->multiInsertBuffers)
572 1236 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
573 :
574 1224 : list_free(miinfo->multiInsertBuffers);
575 1224 : }
576 :
577 : /*
578 : * Get the next TupleTableSlot that the next tuple should be stored in.
579 : *
580 : * Callers must ensure that the buffer is not full.
581 : *
582 : * Note: 'miinfo' is unused but has been included for consistency with the
583 : * other functions in this area.
584 : */
585 : static inline TupleTableSlot *
586 1189292 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
587 : ResultRelInfo *rri)
588 : {
589 1189292 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
590 1189292 : int nused = buffer->nused;
591 :
592 : Assert(buffer != NULL);
593 : Assert(nused < MAX_BUFFERED_TUPLES);
594 :
595 1189292 : if (buffer->slots[nused] == NULL)
596 226960 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
597 1189292 : return buffer->slots[nused];
598 : }
599 :
600 : /*
601 : * Record the previously reserved TupleTableSlot that was reserved by
602 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
603 : */
604 : static inline void
605 1187944 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
606 : TupleTableSlot *slot, int tuplen, uint64 lineno)
607 : {
608 1187944 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
609 :
610 : Assert(buffer != NULL);
611 : Assert(slot == buffer->slots[buffer->nused]);
612 :
613 : /* Store the line number so we can properly report any errors later */
614 1187944 : buffer->linenos[buffer->nused] = lineno;
615 :
616 : /* Record this slot as being used */
617 1187944 : buffer->nused++;
618 :
619 : /* Update how many tuples are stored and their size */
620 1187944 : miinfo->bufferedTuples++;
621 1187944 : miinfo->bufferedBytes += tuplen;
622 1187944 : }
623 :
624 : /*
625 : * Copy FROM file to relation.
626 : */
627 : uint64
628 1552 : CopyFrom(CopyFromState cstate)
629 : {
630 : ResultRelInfo *resultRelInfo;
631 : ResultRelInfo *target_resultRelInfo;
632 1552 : ResultRelInfo *prevResultRelInfo = NULL;
633 1552 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
634 : ModifyTableState *mtstate;
635 : ExprContext *econtext;
636 1552 : TupleTableSlot *singleslot = NULL;
637 1552 : MemoryContext oldcontext = CurrentMemoryContext;
638 :
639 1552 : PartitionTupleRouting *proute = NULL;
640 : ErrorContextCallback errcallback;
641 1552 : CommandId mycid = GetCurrentCommandId(true);
642 1552 : int ti_options = 0; /* start with default options for insert */
643 1552 : BulkInsertState bistate = NULL;
644 : CopyInsertMethod insertMethod;
645 1552 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
646 1552 : int64 processed = 0;
647 1552 : int64 excluded = 0;
648 1552 : int64 skipped = 0;
649 : bool has_before_insert_row_trig;
650 : bool has_instead_insert_row_trig;
651 1552 : bool leafpart_use_multi_insert = false;
652 :
653 : Assert(cstate->rel);
654 : Assert(list_length(cstate->range_table) == 1);
655 :
656 1552 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
657 : Assert(cstate->escontext);
658 :
659 : /*
660 : * The target must be a plain, foreign, or partitioned relation, or have
661 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
662 : * allowed on views, so we only hint about them in the view case.)
663 : */
664 1552 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
665 160 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
666 122 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
667 18 : !(cstate->rel->trigdesc &&
668 12 : cstate->rel->trigdesc->trig_insert_instead_row))
669 : {
670 6 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
671 6 : ereport(ERROR,
672 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
673 : errmsg("cannot copy to view \"%s\"",
674 : RelationGetRelationName(cstate->rel)),
675 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
676 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
677 0 : ereport(ERROR,
678 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
679 : errmsg("cannot copy to materialized view \"%s\"",
680 : RelationGetRelationName(cstate->rel))));
681 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
682 0 : ereport(ERROR,
683 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684 : errmsg("cannot copy to sequence \"%s\"",
685 : RelationGetRelationName(cstate->rel))));
686 : else
687 0 : ereport(ERROR,
688 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
689 : errmsg("cannot copy to non-table relation \"%s\"",
690 : RelationGetRelationName(cstate->rel))));
691 : }
692 :
693 : /*
694 : * If the target file is new-in-transaction, we assume that checking FSM
695 : * for free space is a waste of time. This could possibly be wrong, but
696 : * it's unlikely.
697 : */
698 1546 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
699 1392 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
700 1380 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
701 88 : ti_options |= TABLE_INSERT_SKIP_FSM;
702 :
703 : /*
704 : * Optimize if new relation storage was created in this subxact or one of
705 : * its committed children and we won't see those rows later as part of an
706 : * earlier scan or command. The subxact test ensures that if this subxact
707 : * aborts then the frozen rows won't be visible after xact cleanup. Note
708 : * that the stronger test of exactly which subtransaction created it is
709 : * crucial for correctness of this optimization. The test for an earlier
710 : * scan or command tolerates false negatives. FREEZE causes other sessions
711 : * to see rows they would not see under MVCC, and a false negative merely
712 : * spreads that anomaly to the current session.
713 : */
714 1546 : if (cstate->opts.freeze)
715 : {
716 : /*
717 : * We currently disallow COPY FREEZE on partitioned tables. The
718 : * reason for this is that we've simply not yet opened the partitions
719 : * to determine if the optimization can be applied to them. We could
720 : * go and open them all here, but doing so may be quite a costly
721 : * overhead for small copies. In any case, we may just end up routing
722 : * tuples to a small number of partitions. It seems better just to
723 : * raise an ERROR for partitioned tables.
724 : */
725 66 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
726 : {
727 6 : ereport(ERROR,
728 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
729 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
730 : }
731 :
732 : /*
733 : * Tolerate one registration for the benefit of FirstXactSnapshot.
734 : * Scan-bearing queries generally create at least two registrations,
735 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
736 : * Clear CatalogSnapshot to avoid counting its registration. We'll
737 : * still detect ongoing catalog scans, each of which separately
738 : * registers the snapshot it uses.
739 : */
740 60 : InvalidateCatalogSnapshot();
741 60 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
742 0 : ereport(ERROR,
743 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
744 : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
745 :
746 120 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
747 60 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
748 18 : ereport(ERROR,
749 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
750 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
751 :
752 42 : ti_options |= TABLE_INSERT_FROZEN;
753 : }
754 :
755 : /*
756 : * We need a ResultRelInfo so we can use the regular executor's
757 : * index-entry-making machinery. (There used to be a huge amount of code
758 : * here that basically duplicated execUtils.c ...)
759 : */
760 1522 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
761 1522 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
762 1522 : ExecInitResultRelation(estate, resultRelInfo, 1);
763 :
764 : /* Verify the named relation is a valid target for INSERT */
765 1522 : CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
766 :
767 1520 : ExecOpenIndices(resultRelInfo, false);
768 :
769 : /*
770 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
771 : * foreign-table result relation(s).
772 : */
773 1520 : mtstate = makeNode(ModifyTableState);
774 1520 : mtstate->ps.plan = NULL;
775 1520 : mtstate->ps.state = estate;
776 1520 : mtstate->operation = CMD_INSERT;
777 1520 : mtstate->mt_nrels = 1;
778 1520 : mtstate->resultRelInfo = resultRelInfo;
779 1520 : mtstate->rootResultRelInfo = resultRelInfo;
780 :
781 1520 : if (resultRelInfo->ri_FdwRoutine != NULL &&
782 36 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
783 36 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
784 : resultRelInfo);
785 :
786 : /*
787 : * Also, if the named relation is a foreign table, determine if the FDW
788 : * supports batch insert and determine the batch size (a FDW may support
789 : * batching, but it may be disabled for the server/table).
790 : *
791 : * If the FDW does not support batching, we set the batch size to 1.
792 : */
793 1520 : if (resultRelInfo->ri_FdwRoutine != NULL &&
794 36 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
795 36 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
796 36 : resultRelInfo->ri_BatchSize =
797 36 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
798 : else
799 1484 : resultRelInfo->ri_BatchSize = 1;
800 :
801 : Assert(resultRelInfo->ri_BatchSize >= 1);
802 :
803 : /* Prepare to catch AFTER triggers. */
804 1520 : AfterTriggerBeginQuery();
805 :
806 : /*
807 : * If there are any triggers with transition tables on the named relation,
808 : * we need to be prepared to capture transition tuples.
809 : *
810 : * Because partition tuple routing would like to know about whether
811 : * transition capture is active, we also set it in mtstate, which is
812 : * passed to ExecFindPartition() below.
813 : */
814 1520 : cstate->transition_capture = mtstate->mt_transition_capture =
815 1520 : MakeTransitionCaptureState(cstate->rel->trigdesc,
816 1520 : RelationGetRelid(cstate->rel),
817 : CMD_INSERT);
818 :
819 : /*
820 : * If the named relation is a partitioned table, initialize state for
821 : * CopyFrom tuple routing.
822 : */
823 1520 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
824 98 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
825 :
826 1520 : if (cstate->whereClause)
827 18 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
828 : &mtstate->ps);
829 :
830 : /*
831 : * It's generally more efficient to prepare a bunch of tuples for
832 : * insertion, and insert them in one
833 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
834 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
835 : * However, there are a number of reasons why we might not be able to do
836 : * this. These are explained below.
837 : */
838 1520 : if (resultRelInfo->ri_TrigDesc != NULL &&
839 176 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
840 84 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
841 : {
842 : /*
843 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
844 : * triggers on the table. Such triggers might query the table we're
845 : * inserting into and act differently if the tuples that have already
846 : * been processed and prepared for insertion are not there.
847 : */
848 104 : insertMethod = CIM_SINGLE;
849 : }
850 1416 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
851 28 : resultRelInfo->ri_BatchSize == 1)
852 : {
853 : /*
854 : * Can't support multi-inserts to a foreign table if the FDW does not
855 : * support batching, or it's disabled for the server or foreign table.
856 : */
857 18 : insertMethod = CIM_SINGLE;
858 : }
859 1398 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
860 26 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
861 : {
862 : /*
863 : * For partitioned tables we can't support multi-inserts when there
864 : * are any statement level insert triggers. It might be possible to
865 : * allow partitioned tables with such triggers in the future, but for
866 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
867 : * statement level insert triggers are on the same relation.
868 : */
869 18 : insertMethod = CIM_SINGLE;
870 : }
871 1380 : else if (cstate->volatile_defexprs)
872 : {
873 : /*
874 : * Can't support multi-inserts if there are any volatile default
875 : * expressions in the table. Similarly to the trigger case above,
876 : * such expressions may query the table we're inserting into.
877 : *
878 : * Note: It does not matter if any partitions have any volatile
879 : * default expressions as we use the defaults from the target of the
880 : * COPY command.
881 : */
882 6 : insertMethod = CIM_SINGLE;
883 : }
884 1374 : else if (contain_volatile_functions(cstate->whereClause))
885 : {
886 : /*
887 : * Can't support multi-inserts if there are any volatile function
888 : * expressions in WHERE clause. Similarly to the trigger case above,
889 : * such expressions may query the table we're inserting into.
890 : *
891 : * Note: the whereClause was already preprocessed in DoCopy(), so it's
892 : * okay to use contain_volatile_functions() directly.
893 : */
894 0 : insertMethod = CIM_SINGLE;
895 : }
896 : else
897 : {
898 : /*
899 : * For partitioned tables, we may still be able to perform bulk
900 : * inserts. However, the possibility of this depends on which types
901 : * of triggers exist on the partition. We must disable bulk inserts
902 : * if the partition is a foreign table that can't use batching or it
903 : * has any before row insert or insert instead triggers (same as we
904 : * checked above for the parent table). Since the partition's
905 : * resultRelInfos are initialized only when we actually need to insert
906 : * the first tuple into them, we must have the intermediate insert
907 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
908 : * determine if we can use bulk-inserts for the partition being
909 : * inserted into.
910 : */
911 1374 : if (proute)
912 74 : insertMethod = CIM_MULTI_CONDITIONAL;
913 : else
914 1300 : insertMethod = CIM_MULTI;
915 :
916 1374 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
917 : estate, mycid, ti_options);
918 : }
919 :
920 : /*
921 : * If not using batch mode (which allocates slots as needed) set up a
922 : * tuple slot too. When inserting into a partitioned table, we also need
923 : * one, even if we might batch insert, to read the tuple in the root
924 : * partition's form.
925 : */
926 1520 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
927 : {
928 220 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
929 : &estate->es_tupleTable);
930 220 : bistate = GetBulkInsertState();
931 : }
932 :
933 1696 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
934 176 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
935 :
936 1696 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
937 176 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
938 :
939 : /*
940 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
941 : * should do this for COPY, since it's not really an "INSERT" statement as
942 : * such. However, executing these triggers maintains consistency with the
943 : * EACH ROW triggers that we already fire on COPY.
944 : */
945 1520 : ExecBSInsertTriggers(estate, resultRelInfo);
946 :
947 1520 : econtext = GetPerTupleExprContext(estate);
948 :
949 : /* Set up callback to identify error line number */
950 1520 : errcallback.callback = CopyFromErrorCallback;
951 1520 : errcallback.arg = (void *) cstate;
952 1520 : errcallback.previous = error_context_stack;
953 1520 : error_context_stack = &errcallback;
954 :
955 : for (;;)
956 1248392 : {
957 : TupleTableSlot *myslot;
958 : bool skip_tuple;
959 :
960 1249912 : CHECK_FOR_INTERRUPTS();
961 :
962 : /*
963 : * Reset the per-tuple exprcontext. We do this after every tuple, to
964 : * clean-up after expression evaluations etc.
965 : */
966 1249912 : ResetPerTupleExprContext(estate);
967 :
968 : /* select slot to (initially) load row into */
969 1249912 : if (insertMethod == CIM_SINGLE || proute)
970 : {
971 274872 : myslot = singleslot;
972 274872 : Assert(myslot != NULL);
973 : }
974 : else
975 : {
976 : Assert(resultRelInfo == target_resultRelInfo);
977 : Assert(insertMethod == CIM_MULTI);
978 :
979 975040 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
980 : resultRelInfo);
981 : }
982 :
983 : /*
984 : * Switch to per-tuple context before calling NextCopyFrom, which does
985 : * evaluate default expressions etc. and requires per-tuple context.
986 : */
987 1249912 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
988 :
989 1249912 : ExecClearTuple(myslot);
990 :
991 : /* Directly store the values/nulls array in the slot */
992 1249912 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
993 1358 : break;
994 :
995 1248426 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
996 66 : cstate->escontext->error_occurred)
997 : {
998 : /*
999 : * Soft error occurred, skip this tuple and deal with error
1000 : * information according to ON_ERROR.
1001 : */
1002 42 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1003 :
1004 : /*
1005 : * Just make ErrorSaveContext ready for the next NextCopyFrom.
1006 : * Since we don't set details_wanted and error_data is not to
1007 : * be filled, just resetting error_occurred is enough.
1008 : */
1009 42 : cstate->escontext->error_occurred = false;
1010 :
1011 : /* Report that this tuple was skipped by the ON_ERROR clause */
1012 42 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1013 : ++skipped);
1014 :
1015 42 : continue;
1016 : }
1017 :
1018 1248384 : ExecStoreVirtualTuple(myslot);
1019 :
1020 : /*
1021 : * Constraints and where clause might reference the tableoid column,
1022 : * so (re-)initialize tts_tableOid before evaluating them.
1023 : */
1024 1248384 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1025 :
1026 : /* Triggers and stuff need to be invoked in query context. */
1027 1248384 : MemoryContextSwitchTo(oldcontext);
1028 :
1029 1248384 : if (cstate->whereClause)
1030 : {
1031 66 : econtext->ecxt_scantuple = myslot;
1032 : /* Skip items that don't match COPY's WHERE clause */
1033 66 : if (!ExecQual(cstate->qualexpr, econtext))
1034 : {
1035 : /*
1036 : * Report that this tuple was filtered out by the WHERE
1037 : * clause.
1038 : */
1039 36 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1040 : ++excluded);
1041 36 : continue;
1042 : }
1043 : }
1044 :
1045 : /* Determine the partition to insert the tuple into */
1046 1248348 : if (proute)
1047 : {
1048 : TupleConversionMap *map;
1049 :
1050 : /*
1051 : * Attempt to find a partition suitable for this tuple.
1052 : * ExecFindPartition() will raise an error if none can be found or
1053 : * if the found partition is not suitable for INSERTs.
1054 : */
1055 274390 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1056 : proute, myslot, estate);
1057 :
1058 274388 : if (prevResultRelInfo != resultRelInfo)
1059 : {
1060 : /* Determine which triggers exist on this partition */
1061 161566 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1062 54 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1063 :
1064 161566 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1065 54 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1066 :
1067 : /*
1068 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1069 : * OF triggers, or if the partition is a foreign table that
1070 : * can't use batching.
1071 : */
1072 262970 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1073 101458 : !has_before_insert_row_trig &&
1074 364404 : !has_instead_insert_row_trig &&
1075 101434 : (resultRelInfo->ri_FdwRoutine == NULL ||
1076 12 : resultRelInfo->ri_BatchSize > 1);
1077 :
1078 : /* Set the multi-insert buffer to use for this partition. */
1079 161512 : if (leafpart_use_multi_insert)
1080 : {
1081 101430 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1082 86 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1083 : resultRelInfo);
1084 : }
1085 60082 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1086 28 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1087 : {
1088 : /*
1089 : * Flush pending inserts if this partition can't use
1090 : * batching, so rows are visible to triggers etc.
1091 : */
1092 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1093 : resultRelInfo,
1094 : &processed);
1095 : }
1096 :
1097 161512 : if (bistate != NULL)
1098 161512 : ReleaseBulkInsertStatePin(bistate);
1099 161512 : prevResultRelInfo = resultRelInfo;
1100 : }
1101 :
1102 : /*
1103 : * If we're capturing transition tuples, we might need to convert
1104 : * from the partition rowtype to root rowtype. But if there are no
1105 : * BEFORE triggers on the partition that could change the tuple,
1106 : * we can just remember the original unconverted tuple to avoid a
1107 : * needless round trip conversion.
1108 : */
1109 274388 : if (cstate->transition_capture != NULL)
1110 54 : cstate->transition_capture->tcs_original_insert_tuple =
1111 54 : !has_before_insert_row_trig ? myslot : NULL;
1112 :
1113 : /*
1114 : * We might need to convert from the root rowtype to the partition
1115 : * rowtype.
1116 : */
1117 274388 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1118 274388 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1119 : {
1120 : /* non batch insert */
1121 60136 : if (map != NULL)
1122 : {
1123 : TupleTableSlot *new_slot;
1124 :
1125 110 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1126 110 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1127 : }
1128 : }
1129 : else
1130 : {
1131 : /*
1132 : * Prepare to queue up tuple for later batch insert into
1133 : * current partition.
1134 : */
1135 : TupleTableSlot *batchslot;
1136 :
1137 : /* no other path available for partitioned table */
1138 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1139 :
1140 214252 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1141 : resultRelInfo);
1142 :
1143 214252 : if (map != NULL)
1144 12200 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1145 : batchslot);
1146 : else
1147 : {
1148 : /*
1149 : * This looks more expensive than it is (Believe me, I
1150 : * optimized it away. Twice.). The input is in virtual
1151 : * form, and we'll materialize the slot below - for most
1152 : * slot types the copy performs the work materialization
1153 : * would later require anyway.
1154 : */
1155 202052 : ExecCopySlot(batchslot, myslot);
1156 202052 : myslot = batchslot;
1157 : }
1158 : }
1159 :
1160 : /* ensure that triggers etc see the right relation */
1161 274388 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1162 : }
1163 :
1164 1248346 : skip_tuple = false;
1165 :
1166 : /* BEFORE ROW INSERT Triggers */
1167 1248346 : if (has_before_insert_row_trig)
1168 : {
1169 274 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1170 16 : skip_tuple = true; /* "do nothing" */
1171 : }
1172 :
1173 1248346 : if (!skip_tuple)
1174 : {
1175 : /*
1176 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1177 : * tuple. Otherwise, proceed with inserting the tuple into the
1178 : * table or foreign table.
1179 : */
1180 1248330 : if (has_instead_insert_row_trig)
1181 : {
1182 12 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1183 : }
1184 : else
1185 : {
1186 : /* Compute stored generated columns */
1187 1248318 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1188 461860 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1189 36 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1190 : CMD_INSERT);
1191 :
1192 : /*
1193 : * If the target is a plain table, check the constraints of
1194 : * the tuple.
1195 : */
1196 1248318 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1197 1248230 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1198 461824 : ExecConstraints(resultRelInfo, myslot, estate);
1199 :
1200 : /*
1201 : * Also check the tuple against the partition constraint, if
1202 : * there is one; except that if we got here via tuple-routing,
1203 : * we don't need to if there's no BR trigger defined on the
1204 : * partition.
1205 : */
1206 1248288 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1207 274376 : (proute == NULL || has_before_insert_row_trig))
1208 2308 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1209 :
1210 : /* Store the slot in the multi-insert buffer, when enabled. */
1211 1248288 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1212 : {
1213 : /*
1214 : * The slot previously might point into the per-tuple
1215 : * context. For batching it needs to be longer lived.
1216 : */
1217 1187944 : ExecMaterializeSlot(myslot);
1218 :
1219 : /* Add this tuple to the tuple buffer */
1220 1187944 : CopyMultiInsertInfoStore(&multiInsertInfo,
1221 : resultRelInfo, myslot,
1222 : cstate->line_buf.len,
1223 : cstate->cur_lineno);
1224 :
1225 : /*
1226 : * If enough inserts have queued up, then flush all
1227 : * buffers out to their tables.
1228 : */
1229 1187944 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1230 1100 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1231 : resultRelInfo,
1232 : &processed);
1233 :
1234 : /*
1235 : * We delay updating the row counter and progress of the
1236 : * COPY command until after writing the tuples stored in
1237 : * the buffer out to the table, as in single insert mode.
1238 : * See CopyMultiInsertBufferFlush().
1239 : */
1240 1187944 : continue; /* next tuple please */
1241 : }
1242 : else
1243 : {
1244 60344 : List *recheckIndexes = NIL;
1245 :
1246 : /* OK, store the tuple */
1247 60344 : if (resultRelInfo->ri_FdwRoutine != NULL)
1248 : {
1249 50 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1250 : resultRelInfo,
1251 : myslot,
1252 : NULL);
1253 :
1254 48 : if (myslot == NULL) /* "do nothing" */
1255 4 : continue; /* next tuple please */
1256 :
1257 : /*
1258 : * AFTER ROW Triggers might reference the tableoid
1259 : * column, so (re-)initialize tts_tableOid before
1260 : * evaluating them.
1261 : */
1262 44 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1263 : }
1264 : else
1265 : {
1266 : /* OK, store the tuple and create index entries for it */
1267 60294 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1268 : myslot, mycid, ti_options, bistate);
1269 :
1270 60294 : if (resultRelInfo->ri_NumIndices > 0)
1271 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1272 : myslot,
1273 : estate,
1274 : false,
1275 : false,
1276 : NULL,
1277 : NIL,
1278 : false);
1279 : }
1280 :
1281 : /* AFTER ROW INSERT Triggers */
1282 60338 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1283 : recheckIndexes, cstate->transition_capture);
1284 :
1285 60338 : list_free(recheckIndexes);
1286 : }
1287 : }
1288 :
1289 : /*
1290 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1291 : * or FDW; this is the same definition used by nodeModifyTable.c
1292 : * for counting tuples inserted by an INSERT command. Update
1293 : * progress of the COPY command as well.
1294 : */
1295 60350 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1296 : ++processed);
1297 : }
1298 : }
1299 :
1300 : /* Flush any remaining buffered tuples */
1301 1358 : if (insertMethod != CIM_SINGLE)
1302 : {
1303 1238 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1304 1008 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1305 : }
1306 :
1307 : /* Done, clean up */
1308 1344 : error_context_stack = errcallback.previous;
1309 :
1310 1344 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1311 12 : cstate->num_errors > 0)
1312 12 : ereport(NOTICE,
1313 : errmsg_plural("%llu row was skipped due to data type incompatibility",
1314 : "%llu rows were skipped due to data type incompatibility",
1315 : (unsigned long long) cstate->num_errors,
1316 : (unsigned long long) cstate->num_errors));
1317 :
1318 1344 : if (bistate != NULL)
1319 192 : FreeBulkInsertState(bistate);
1320 :
1321 1344 : MemoryContextSwitchTo(oldcontext);
1322 :
1323 : /* Execute AFTER STATEMENT insertion triggers */
1324 1344 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1325 :
1326 : /* Handle queued AFTER triggers */
1327 1344 : AfterTriggerEndQuery(estate);
1328 :
1329 1344 : ExecResetTupleTable(estate->es_tupleTable, false);
1330 :
1331 : /* Allow the FDW to shut down */
1332 1344 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1333 32 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1334 32 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1335 : target_resultRelInfo);
1336 :
1337 : /* Tear down the multi-insert buffer data */
1338 1344 : if (insertMethod != CIM_SINGLE)
1339 1224 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1340 :
1341 : /* Close all the partitioned tables, leaf partitions, and their indices */
1342 1344 : if (proute)
1343 96 : ExecCleanupTupleRouting(mtstate, proute);
1344 :
1345 : /* Close the result relations, including any trigger target relations */
1346 1344 : ExecCloseResultRelations(estate);
1347 1344 : ExecCloseRangeTableRelations(estate);
1348 :
1349 1344 : FreeExecutorState(estate);
1350 :
1351 1344 : return processed;
1352 : }
1353 :
1354 : /*
1355 : * Setup to read tuples from a file for COPY FROM.
1356 : *
1357 : * 'rel': Used as a template for the tuples
1358 : * 'whereClause': WHERE clause from the COPY FROM command
1359 : * 'filename': Name of server-local file to read, NULL for STDIN
1360 : * 'is_program': true if 'filename' is program to execute
1361 : * 'data_source_cb': callback that provides the input data
1362 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1363 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1364 : *
1365 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1366 : */
1367 : CopyFromState
1368 1796 : BeginCopyFrom(ParseState *pstate,
1369 : Relation rel,
1370 : Node *whereClause,
1371 : const char *filename,
1372 : bool is_program,
1373 : copy_data_source_cb data_source_cb,
1374 : List *attnamelist,
1375 : List *options)
1376 : {
1377 : CopyFromState cstate;
1378 1796 : bool pipe = (filename == NULL);
1379 : TupleDesc tupDesc;
1380 : AttrNumber num_phys_attrs,
1381 : num_defaults;
1382 : FmgrInfo *in_functions;
1383 : Oid *typioparams;
1384 : Oid in_func_oid;
1385 : int *defmap;
1386 : ExprState **defexprs;
1387 : MemoryContext oldcontext;
1388 : bool volatile_defexprs;
1389 1796 : const int progress_cols[] = {
1390 : PROGRESS_COPY_COMMAND,
1391 : PROGRESS_COPY_TYPE,
1392 : PROGRESS_COPY_BYTES_TOTAL
1393 : };
1394 1796 : int64 progress_vals[] = {
1395 : PROGRESS_COPY_COMMAND_FROM,
1396 : 0,
1397 : 0
1398 : };
1399 :
1400 : /* Allocate workspace and zero all fields */
1401 1796 : cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1402 :
1403 : /*
1404 : * We allocate everything used by a cstate in a new memory context. This
1405 : * avoids memory leaks during repeated use of COPY in a query.
1406 : */
1407 1796 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1408 : "COPY",
1409 : ALLOCSET_DEFAULT_SIZES);
1410 :
1411 1796 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1412 :
1413 : /* Extract options from the statement node tree */
1414 1796 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1415 :
1416 : /* Process the target relation */
1417 1638 : cstate->rel = rel;
1418 :
1419 1638 : tupDesc = RelationGetDescr(cstate->rel);
1420 :
1421 : /* process common options or initialization */
1422 :
1423 : /* Generate or convert list of attributes to process */
1424 1638 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1425 :
1426 1638 : num_phys_attrs = tupDesc->natts;
1427 :
1428 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1429 1638 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1430 1638 : if (cstate->opts.force_notnull_all)
1431 12 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1432 1626 : else if (cstate->opts.force_notnull)
1433 : {
1434 : List *attnums;
1435 : ListCell *cur;
1436 :
1437 26 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1438 :
1439 52 : foreach(cur, attnums)
1440 : {
1441 32 : int attnum = lfirst_int(cur);
1442 32 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1443 :
1444 32 : if (!list_member_int(cstate->attnumlist, attnum))
1445 6 : ereport(ERROR,
1446 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1447 : errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1448 : NameStr(attr->attname))));
1449 26 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1450 : }
1451 : }
1452 :
1453 : /* Set up soft error handler for ON_ERROR */
1454 1632 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1455 : {
1456 30 : cstate->escontext = makeNode(ErrorSaveContext);
1457 30 : cstate->escontext->type = T_ErrorSaveContext;
1458 30 : cstate->escontext->error_occurred = false;
1459 :
1460 : /*
1461 : * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1462 : * options later
1463 : */
1464 30 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1465 30 : cstate->escontext->details_wanted = false;
1466 : }
1467 : else
1468 1602 : cstate->escontext = NULL;
1469 :
1470 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1471 1632 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1472 1632 : if (cstate->opts.force_null_all)
1473 12 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1474 1620 : else if (cstate->opts.force_null)
1475 : {
1476 : List *attnums;
1477 : ListCell *cur;
1478 :
1479 26 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1480 :
1481 52 : foreach(cur, attnums)
1482 : {
1483 32 : int attnum = lfirst_int(cur);
1484 32 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1485 :
1486 32 : if (!list_member_int(cstate->attnumlist, attnum))
1487 6 : ereport(ERROR,
1488 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1489 : errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1490 : NameStr(attr->attname))));
1491 26 : cstate->opts.force_null_flags[attnum - 1] = true;
1492 : }
1493 : }
1494 :
1495 : /* Convert convert_selectively name list to per-column flags */
1496 1626 : if (cstate->opts.convert_selectively)
1497 : {
1498 : List *attnums;
1499 : ListCell *cur;
1500 :
1501 4 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1502 :
1503 4 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1504 :
1505 8 : foreach(cur, attnums)
1506 : {
1507 4 : int attnum = lfirst_int(cur);
1508 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1509 :
1510 4 : if (!list_member_int(cstate->attnumlist, attnum))
1511 0 : ereport(ERROR,
1512 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1513 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1514 : NameStr(attr->attname))));
1515 4 : cstate->convert_select_flags[attnum - 1] = true;
1516 : }
1517 : }
1518 :
1519 : /* Use client encoding when ENCODING option is not specified. */
1520 1626 : if (cstate->opts.file_encoding < 0)
1521 1620 : cstate->file_encoding = pg_get_client_encoding();
1522 : else
1523 6 : cstate->file_encoding = cstate->opts.file_encoding;
1524 :
1525 : /*
1526 : * Look up encoding conversion function.
1527 : */
1528 1626 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1529 6 : cstate->file_encoding == PG_SQL_ASCII ||
1530 0 : GetDatabaseEncoding() == PG_SQL_ASCII)
1531 : {
1532 1626 : cstate->need_transcoding = false;
1533 : }
1534 : else
1535 : {
1536 0 : cstate->need_transcoding = true;
1537 0 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1538 : GetDatabaseEncoding());
1539 0 : if (!OidIsValid(cstate->conversion_proc))
1540 0 : ereport(ERROR,
1541 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1542 : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1543 : pg_encoding_to_char(cstate->file_encoding),
1544 : pg_encoding_to_char(GetDatabaseEncoding()))));
1545 : }
1546 :
1547 1626 : cstate->copy_src = COPY_FILE; /* default */
1548 :
1549 1626 : cstate->whereClause = whereClause;
1550 :
1551 : /* Initialize state variables */
1552 1626 : cstate->eol_type = EOL_UNKNOWN;
1553 1626 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1554 1626 : cstate->cur_lineno = 0;
1555 1626 : cstate->cur_attname = NULL;
1556 1626 : cstate->cur_attval = NULL;
1557 1626 : cstate->relname_only = false;
1558 :
1559 : /*
1560 : * Allocate buffers for the input pipeline.
1561 : *
1562 : * attribute_buf and raw_buf are used in both text and binary modes, but
1563 : * input_buf and line_buf only in text mode.
1564 : */
1565 1626 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1566 1626 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1567 1626 : cstate->raw_reached_eof = false;
1568 :
1569 1626 : if (!cstate->opts.binary)
1570 : {
1571 : /*
1572 : * If encoding conversion is needed, we need another buffer to hold
1573 : * the converted input data. Otherwise, we can just point input_buf
1574 : * to the same buffer as raw_buf.
1575 : */
1576 1610 : if (cstate->need_transcoding)
1577 : {
1578 0 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1579 0 : cstate->input_buf_index = cstate->input_buf_len = 0;
1580 : }
1581 : else
1582 1610 : cstate->input_buf = cstate->raw_buf;
1583 1610 : cstate->input_reached_eof = false;
1584 :
1585 1610 : initStringInfo(&cstate->line_buf);
1586 : }
1587 :
1588 1626 : initStringInfo(&cstate->attribute_buf);
1589 :
1590 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1591 1626 : if (pstate)
1592 : {
1593 1566 : cstate->range_table = pstate->p_rtable;
1594 1566 : cstate->rteperminfos = pstate->p_rteperminfos;
1595 : }
1596 :
1597 1626 : num_defaults = 0;
1598 1626 : volatile_defexprs = false;
1599 :
1600 : /*
1601 : * Pick up the required catalog information for each attribute in the
1602 : * relation, including the input function, the element type (to pass to
1603 : * the input function), and info about defaults and constraints. (Which
1604 : * input function we use depends on text/binary format choice.)
1605 : */
1606 1626 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1607 1626 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1608 1626 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1609 1626 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1610 :
1611 6322 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1612 : {
1613 4710 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1614 :
1615 : /* We don't need info for dropped attributes */
1616 4710 : if (att->attisdropped)
1617 124 : continue;
1618 :
1619 : /* Fetch the input function and typioparam info */
1620 4586 : if (cstate->opts.binary)
1621 62 : getTypeBinaryInputInfo(att->atttypid,
1622 62 : &in_func_oid, &typioparams[attnum - 1]);
1623 : else
1624 4524 : getTypeInputInfo(att->atttypid,
1625 4524 : &in_func_oid, &typioparams[attnum - 1]);
1626 4584 : fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1627 :
1628 : /* Get default info if available */
1629 4584 : defexprs[attnum - 1] = NULL;
1630 :
1631 : /*
1632 : * We only need the default values for columns that do not appear in
1633 : * the column list, unless the DEFAULT option was given. We never need
1634 : * default values for generated columns.
1635 : */
1636 4584 : if ((cstate->opts.default_print != NULL ||
1637 4458 : !list_member_int(cstate->attnumlist, attnum)) &&
1638 600 : !att->attgenerated)
1639 : {
1640 580 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1641 : attnum);
1642 :
1643 580 : if (defexpr != NULL)
1644 : {
1645 : /* Run the expression through planner */
1646 264 : defexpr = expression_planner(defexpr);
1647 :
1648 : /* Initialize executable expression in copycontext */
1649 252 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1650 :
1651 : /* if NOT copied from input */
1652 : /* use default value if one exists */
1653 252 : if (!list_member_int(cstate->attnumlist, attnum))
1654 : {
1655 172 : defmap[num_defaults] = attnum - 1;
1656 172 : num_defaults++;
1657 : }
1658 :
1659 : /*
1660 : * If a default expression looks at the table being loaded,
1661 : * then it could give the wrong answer when using
1662 : * multi-insert. Since database access can be dynamic this is
1663 : * hard to test for exactly, so we use the much wider test of
1664 : * whether the default expression is volatile. We allow for
1665 : * the special case of when the default expression is the
1666 : * nextval() of a sequence which in this specific case is
1667 : * known to be safe for use with the multi-insert
1668 : * optimization. Hence we use this special case function
1669 : * checker rather than the standard check for
1670 : * contain_volatile_functions(). Note also that we already
1671 : * ran the expression through expression_planner().
1672 : */
1673 252 : if (!volatile_defexprs)
1674 252 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1675 : }
1676 : }
1677 : }
1678 :
1679 1612 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1680 :
1681 : /* initialize progress */
1682 1612 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1683 1612 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1684 1612 : cstate->bytes_processed = 0;
1685 :
1686 : /* We keep those variables in cstate. */
1687 1612 : cstate->in_functions = in_functions;
1688 1612 : cstate->typioparams = typioparams;
1689 1612 : cstate->defmap = defmap;
1690 1612 : cstate->defexprs = defexprs;
1691 1612 : cstate->volatile_defexprs = volatile_defexprs;
1692 1612 : cstate->num_defaults = num_defaults;
1693 1612 : cstate->is_program = is_program;
1694 :
1695 1612 : if (data_source_cb)
1696 : {
1697 334 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1698 334 : cstate->copy_src = COPY_CALLBACK;
1699 334 : cstate->data_source_cb = data_source_cb;
1700 : }
1701 1278 : else if (pipe)
1702 : {
1703 890 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1704 : Assert(!is_program); /* the grammar does not allow this */
1705 890 : if (whereToSendOutput == DestRemote)
1706 890 : ReceiveCopyBegin(cstate);
1707 : else
1708 0 : cstate->copy_file = stdin;
1709 : }
1710 : else
1711 : {
1712 388 : cstate->filename = pstrdup(filename);
1713 :
1714 388 : if (cstate->is_program)
1715 : {
1716 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1717 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1718 0 : if (cstate->copy_file == NULL)
1719 0 : ereport(ERROR,
1720 : (errcode_for_file_access(),
1721 : errmsg("could not execute command \"%s\": %m",
1722 : cstate->filename)));
1723 : }
1724 : else
1725 : {
1726 : struct stat st;
1727 :
1728 388 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1729 388 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1730 388 : if (cstate->copy_file == NULL)
1731 : {
1732 : /* copy errno because ereport subfunctions might change it */
1733 0 : int save_errno = errno;
1734 :
1735 0 : ereport(ERROR,
1736 : (errcode_for_file_access(),
1737 : errmsg("could not open file \"%s\" for reading: %m",
1738 : cstate->filename),
1739 : (save_errno == ENOENT || save_errno == EACCES) ?
1740 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1741 : "You may want a client-side facility such as psql's \\copy.") : 0));
1742 : }
1743 :
1744 388 : if (fstat(fileno(cstate->copy_file), &st))
1745 0 : ereport(ERROR,
1746 : (errcode_for_file_access(),
1747 : errmsg("could not stat file \"%s\": %m",
1748 : cstate->filename)));
1749 :
1750 388 : if (S_ISDIR(st.st_mode))
1751 0 : ereport(ERROR,
1752 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1753 : errmsg("\"%s\" is a directory", cstate->filename)));
1754 :
1755 388 : progress_vals[2] = st.st_size;
1756 : }
1757 : }
1758 :
1759 1612 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1760 :
1761 1612 : if (cstate->opts.binary)
1762 : {
1763 : /* Read and verify binary header */
1764 14 : ReceiveCopyBinaryHeader(cstate);
1765 : }
1766 :
1767 : /* create workspace for CopyReadAttributes results */
1768 1612 : if (!cstate->opts.binary)
1769 : {
1770 1598 : AttrNumber attr_count = list_length(cstate->attnumlist);
1771 :
1772 1598 : cstate->max_fields = attr_count;
1773 1598 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1774 : }
1775 :
1776 1612 : MemoryContextSwitchTo(oldcontext);
1777 :
1778 1612 : return cstate;
1779 : }
1780 :
1781 : /*
1782 : * Clean up storage and release resources for COPY FROM.
1783 : */
1784 : void
1785 1080 : EndCopyFrom(CopyFromState cstate)
1786 : {
1787 : /* No COPY FROM related resources except memory. */
1788 1080 : if (cstate->is_program)
1789 : {
1790 0 : ClosePipeFromProgram(cstate);
1791 : }
1792 : else
1793 : {
1794 1080 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1795 0 : ereport(ERROR,
1796 : (errcode_for_file_access(),
1797 : errmsg("could not close file \"%s\": %m",
1798 : cstate->filename)));
1799 : }
1800 :
1801 1080 : pgstat_progress_end_command();
1802 :
1803 1080 : MemoryContextDelete(cstate->copycontext);
1804 1080 : pfree(cstate);
1805 1080 : }
1806 :
1807 : /*
1808 : * Closes the pipe from an external program, checking the pclose() return code.
1809 : */
1810 : static void
1811 0 : ClosePipeFromProgram(CopyFromState cstate)
1812 : {
1813 : int pclose_rc;
1814 :
1815 : Assert(cstate->is_program);
1816 :
1817 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1818 0 : if (pclose_rc == -1)
1819 0 : ereport(ERROR,
1820 : (errcode_for_file_access(),
1821 : errmsg("could not close pipe to external command: %m")));
1822 0 : else if (pclose_rc != 0)
1823 : {
1824 : /*
1825 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1826 : * expectable for the called program to fail with SIGPIPE, and we
1827 : * should not report that as an error. Otherwise, SIGPIPE indicates a
1828 : * problem.
1829 : */
1830 0 : if (!cstate->raw_reached_eof &&
1831 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1832 0 : return;
1833 :
1834 0 : ereport(ERROR,
1835 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1836 : errmsg("program \"%s\" failed",
1837 : cstate->filename),
1838 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1839 : }
1840 : }
|