Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyto.c
4 : * COPY <table> TO file/program/client
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/copyto.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <ctype.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 :
21 : #include "access/table.h"
22 : #include "access/tableam.h"
23 : #include "catalog/pg_inherits.h"
24 : #include "commands/copyapi.h"
25 : #include "commands/progress.h"
26 : #include "executor/execdesc.h"
27 : #include "executor/executor.h"
28 : #include "executor/tuptable.h"
29 : #include "libpq/libpq.h"
30 : #include "libpq/pqformat.h"
31 : #include "mb/pg_wchar.h"
32 : #include "miscadmin.h"
33 : #include "pgstat.h"
34 : #include "storage/fd.h"
35 : #include "tcop/tcopprot.h"
36 : #include "utils/lsyscache.h"
37 : #include "utils/memutils.h"
38 : #include "utils/rel.h"
39 : #include "utils/snapmgr.h"
40 :
41 : /*
42 : * Represents the different dest cases we need to worry about at
43 : * the bottom level
44 : */
45 : typedef enum CopyDest
46 : {
47 : COPY_FILE, /* to file (or a piped program) */
48 : COPY_FRONTEND, /* to frontend */
49 : COPY_CALLBACK, /* to callback function */
50 : } CopyDest;
51 :
52 : /*
53 : * This struct contains all the state variables used throughout a COPY TO
54 : * operation.
55 : *
56 : * Multi-byte encodings: all supported client-side encodings encode multi-byte
57 : * characters by having the first byte's high bit set. Subsequent bytes of the
58 : * character can have the high bit not set. When scanning data in such an
59 : * encoding to look for a match to a single-byte (ie ASCII) character, we must
60 : * use the full pg_encoding_mblen() machinery to skip over multibyte
61 : * characters, else we might find a false match to a trailing byte. In
62 : * supported server encodings, there is no possibility of a false match, and
63 : * it's faster to make useless comparisons to trailing bytes than it is to
64 : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
65 : * when we have to do it the hard way.
66 : */
67 : typedef struct CopyToStateData
68 : {
69 : /* format-specific routines */
70 : const CopyToRoutine *routine;
71 :
72 : /* low-level state data */
73 : CopyDest copy_dest; /* type of copy source/destination */
74 : FILE *copy_file; /* used if copy_dest == COPY_FILE */
75 : StringInfo fe_msgbuf; /* used for all dests during COPY TO */
76 :
77 : int file_encoding; /* file or remote side's character encoding */
78 : bool need_transcoding; /* file encoding diff from server? */
79 : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
80 :
81 : /* parameters from the COPY command */
82 : Relation rel; /* relation to copy to */
83 : QueryDesc *queryDesc; /* executable query to copy from */
84 : List *attnumlist; /* integer list of attnums to copy */
85 : char *filename; /* filename, or NULL for STDOUT */
86 : bool is_program; /* is 'filename' a program to popen? */
87 : copy_data_dest_cb data_dest_cb; /* function for writing data */
88 :
89 : CopyFormatOptions opts;
90 : Node *whereClause; /* WHERE condition (or NULL) */
91 : List *partitions; /* OID list of partitions to copy data from */
92 :
93 : /*
94 : * Working state
95 : */
96 : MemoryContext copycontext; /* per-copy execution context */
97 :
98 : FmgrInfo *out_functions; /* lookup info for output functions */
99 : MemoryContext rowcontext; /* per-row evaluation context */
100 : uint64 bytes_processed; /* number of bytes processed so far */
101 : } CopyToStateData;
102 :
103 : /* DestReceiver for COPY (query) TO */
104 : typedef struct
105 : {
106 : DestReceiver pub; /* publicly-known function pointers */
107 : CopyToState cstate; /* CopyToStateData for the command */
108 : uint64 processed; /* # of tuples processed */
109 : } DR_copy;
110 :
111 : /* NOTE: there's a copy of this in copyfromparse.c */
112 : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
113 :
114 :
115 : /* non-export function prototypes */
116 : static void EndCopy(CopyToState cstate);
117 : static void ClosePipeToProgram(CopyToState cstate);
118 : static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
119 : static void CopyAttributeOutText(CopyToState cstate, const char *string);
120 : static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
121 : bool use_quote);
122 : static void CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel,
123 : uint64 *processed);
124 :
125 : /* built-in format-specific routines */
126 : static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
127 : static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
128 : static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
129 : static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
130 : static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
131 : bool is_csv);
132 : static void CopyToTextLikeEnd(CopyToState cstate);
133 : static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
134 : static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
135 : static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
136 : static void CopyToBinaryEnd(CopyToState cstate);
137 :
138 : /* Low-level communications functions */
139 : static void SendCopyBegin(CopyToState cstate);
140 : static void SendCopyEnd(CopyToState cstate);
141 : static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
142 : static void CopySendString(CopyToState cstate, const char *str);
143 : static void CopySendChar(CopyToState cstate, char c);
144 : static void CopySendEndOfRow(CopyToState cstate);
145 : static void CopySendTextLikeEndOfRow(CopyToState cstate);
146 : static void CopySendInt32(CopyToState cstate, int32 val);
147 : static void CopySendInt16(CopyToState cstate, int16 val);
148 :
149 : /*
150 : * COPY TO routines for built-in formats.
151 : *
152 : * CSV and text formats share the same TextLike routines except for the
153 : * one-row callback.
154 : */
155 :
156 : /* text format */
157 : static const CopyToRoutine CopyToRoutineText = {
158 : .CopyToStart = CopyToTextLikeStart,
159 : .CopyToOutFunc = CopyToTextLikeOutFunc,
160 : .CopyToOneRow = CopyToTextOneRow,
161 : .CopyToEnd = CopyToTextLikeEnd,
162 : };
163 :
164 : /* CSV format */
165 : static const CopyToRoutine CopyToRoutineCSV = {
166 : .CopyToStart = CopyToTextLikeStart,
167 : .CopyToOutFunc = CopyToTextLikeOutFunc,
168 : .CopyToOneRow = CopyToCSVOneRow,
169 : .CopyToEnd = CopyToTextLikeEnd,
170 : };
171 :
172 : /* binary format */
173 : static const CopyToRoutine CopyToRoutineBinary = {
174 : .CopyToStart = CopyToBinaryStart,
175 : .CopyToOutFunc = CopyToBinaryOutFunc,
176 : .CopyToOneRow = CopyToBinaryOneRow,
177 : .CopyToEnd = CopyToBinaryEnd,
178 : };
179 :
180 : /* Return a COPY TO routine for the given options */
181 : static const CopyToRoutine *
182 9358 : CopyToGetRoutine(const CopyFormatOptions *opts)
183 : {
184 9358 : if (opts->csv_mode)
185 126 : return &CopyToRoutineCSV;
186 9232 : else if (opts->binary)
187 16 : return &CopyToRoutineBinary;
188 :
189 : /* default is text */
190 9216 : return &CopyToRoutineText;
191 : }
192 :
193 : /* Implementation of the start callback for text and CSV formats */
194 : static void
195 9208 : CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
196 : {
197 : /*
198 : * For non-binary copy, we need to convert null_print to file encoding,
199 : * because it will be sent directly with CopySendString.
200 : */
201 9208 : if (cstate->need_transcoding)
202 2 : cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
203 : cstate->opts.null_print_len,
204 : cstate->file_encoding);
205 :
206 : /* if a header has been requested send the line */
207 9208 : if (cstate->opts.header_line == COPY_HEADER_TRUE)
208 : {
209 : ListCell *cur;
210 36 : bool hdr_delim = false;
211 :
212 96 : foreach(cur, cstate->attnumlist)
213 : {
214 60 : int attnum = lfirst_int(cur);
215 : char *colname;
216 :
217 60 : if (hdr_delim)
218 24 : CopySendChar(cstate, cstate->opts.delim[0]);
219 60 : hdr_delim = true;
220 :
221 60 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
222 :
223 60 : if (cstate->opts.csv_mode)
224 24 : CopyAttributeOutCSV(cstate, colname, false);
225 : else
226 36 : CopyAttributeOutText(cstate, colname);
227 : }
228 :
229 36 : CopySendTextLikeEndOfRow(cstate);
230 : }
231 9208 : }
232 :
233 : /*
234 : * Implementation of the outfunc callback for text and CSV formats. Assign
235 : * the output function data to the given *finfo.
236 : */
237 : static void
238 32052 : CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
239 : {
240 : Oid func_oid;
241 : bool is_varlena;
242 :
243 : /* Set output function for an attribute */
244 32052 : getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
245 32052 : fmgr_info(func_oid, finfo);
246 32052 : }
247 :
248 : /* Implementation of the per-row callback for text format */
249 : static void
250 3660816 : CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
251 : {
252 3660816 : CopyToTextLikeOneRow(cstate, slot, false);
253 3660816 : }
254 :
255 : /* Implementation of the per-row callback for CSV format */
256 : static void
257 330 : CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
258 : {
259 330 : CopyToTextLikeOneRow(cstate, slot, true);
260 330 : }
261 :
262 : /*
263 : * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
264 : *
265 : * We use pg_attribute_always_inline to reduce function call overhead
266 : * and to help compilers to optimize away the 'is_csv' condition.
267 : */
268 : static pg_attribute_always_inline void
269 3661146 : CopyToTextLikeOneRow(CopyToState cstate,
270 : TupleTableSlot *slot,
271 : bool is_csv)
272 : {
273 3661146 : bool need_delim = false;
274 3661146 : FmgrInfo *out_functions = cstate->out_functions;
275 :
276 21478256 : foreach_int(attnum, cstate->attnumlist)
277 : {
278 14155964 : Datum value = slot->tts_values[attnum - 1];
279 14155964 : bool isnull = slot->tts_isnull[attnum - 1];
280 :
281 14155964 : if (need_delim)
282 10494948 : CopySendChar(cstate, cstate->opts.delim[0]);
283 14155964 : need_delim = true;
284 :
285 14155964 : if (isnull)
286 : {
287 1190390 : CopySendString(cstate, cstate->opts.null_print_client);
288 : }
289 : else
290 : {
291 : char *string;
292 :
293 12965574 : string = OutputFunctionCall(&out_functions[attnum - 1],
294 : value);
295 :
296 12965574 : if (is_csv)
297 594 : CopyAttributeOutCSV(cstate, string,
298 594 : cstate->opts.force_quote_flags[attnum - 1]);
299 : else
300 12964980 : CopyAttributeOutText(cstate, string);
301 : }
302 : }
303 :
304 3661146 : CopySendTextLikeEndOfRow(cstate);
305 3661146 : }
306 :
307 : /* Implementation of the end callback for text and CSV formats */
308 : static void
309 9208 : CopyToTextLikeEnd(CopyToState cstate)
310 : {
311 : /* Nothing to do here */
312 9208 : }
313 :
314 : /*
315 : * Implementation of the start callback for binary format. Send a header
316 : * for a binary copy.
317 : */
318 : static void
319 14 : CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
320 : {
321 : int32 tmp;
322 :
323 : /* Signature */
324 14 : CopySendData(cstate, BinarySignature, 11);
325 : /* Flags field */
326 14 : tmp = 0;
327 14 : CopySendInt32(cstate, tmp);
328 : /* No header extension */
329 14 : tmp = 0;
330 14 : CopySendInt32(cstate, tmp);
331 14 : }
332 :
333 : /*
334 : * Implementation of the outfunc callback for binary format. Assign
335 : * the binary output function to the given *finfo.
336 : */
337 : static void
338 62 : CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
339 : {
340 : Oid func_oid;
341 : bool is_varlena;
342 :
343 : /* Set output function for an attribute */
344 62 : getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
345 60 : fmgr_info(func_oid, finfo);
346 60 : }
347 :
348 : /* Implementation of the per-row callback for binary format */
349 : static void
350 32 : CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
351 : {
352 32 : FmgrInfo *out_functions = cstate->out_functions;
353 :
354 : /* Binary per-tuple header */
355 32 : CopySendInt16(cstate, list_length(cstate->attnumlist));
356 :
357 224 : foreach_int(attnum, cstate->attnumlist)
358 : {
359 160 : Datum value = slot->tts_values[attnum - 1];
360 160 : bool isnull = slot->tts_isnull[attnum - 1];
361 :
362 160 : if (isnull)
363 : {
364 30 : CopySendInt32(cstate, -1);
365 : }
366 : else
367 : {
368 : bytea *outputbytes;
369 :
370 130 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
371 : value);
372 130 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
373 130 : CopySendData(cstate, VARDATA(outputbytes),
374 130 : VARSIZE(outputbytes) - VARHDRSZ);
375 : }
376 : }
377 :
378 32 : CopySendEndOfRow(cstate);
379 32 : }
380 :
381 : /* Implementation of the end callback for binary format */
382 : static void
383 14 : CopyToBinaryEnd(CopyToState cstate)
384 : {
385 : /* Generate trailer for a binary copy */
386 14 : CopySendInt16(cstate, -1);
387 : /* Need to flush out the trailer */
388 14 : CopySendEndOfRow(cstate);
389 14 : }
390 :
391 : /*
392 : * Send copy start/stop messages for frontend copies. These have changed
393 : * in past protocol redesigns.
394 : */
395 : static void
396 9160 : SendCopyBegin(CopyToState cstate)
397 : {
398 : StringInfoData buf;
399 9160 : int natts = list_length(cstate->attnumlist);
400 9160 : int16 format = (cstate->opts.binary ? 1 : 0);
401 : int i;
402 :
403 9160 : pq_beginmessage(&buf, PqMsg_CopyOutResponse);
404 9160 : pq_sendbyte(&buf, format); /* overall format */
405 9160 : pq_sendint16(&buf, natts);
406 41042 : for (i = 0; i < natts; i++)
407 31882 : pq_sendint16(&buf, format); /* per-column formats */
408 9160 : pq_endmessage(&buf);
409 9160 : cstate->copy_dest = COPY_FRONTEND;
410 9160 : }
411 :
412 : static void
413 9158 : SendCopyEnd(CopyToState cstate)
414 : {
415 : /* Shouldn't have any unsent data */
416 : Assert(cstate->fe_msgbuf->len == 0);
417 : /* Send Copy Done message */
418 9158 : pq_putemptymessage(PqMsg_CopyDone);
419 9158 : }
420 :
421 : /*----------
422 : * CopySendData sends output data to the destination (file or frontend)
423 : * CopySendString does the same for null-terminated strings
424 : * CopySendChar does the same for single characters
425 : * CopySendEndOfRow does the appropriate thing at end of each data row
426 : * (data is not actually flushed except by CopySendEndOfRow)
427 : *
428 : * NB: no data conversion is applied by these functions
429 : *----------
430 : */
431 : static void
432 12766192 : CopySendData(CopyToState cstate, const void *databuf, int datasize)
433 : {
434 12766192 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
435 12766192 : }
436 :
437 : static void
438 1190684 : CopySendString(CopyToState cstate, const char *str)
439 : {
440 1190684 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
441 1190684 : }
442 :
443 : static void
444 14189442 : CopySendChar(CopyToState cstate, char c)
445 : {
446 14189442 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
447 14189442 : }
448 :
449 : static void
450 3661228 : CopySendEndOfRow(CopyToState cstate)
451 : {
452 3661228 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
453 :
454 3661228 : switch (cstate->copy_dest)
455 : {
456 12282 : case COPY_FILE:
457 12282 : pgstat_report_wait_start(WAIT_EVENT_COPY_TO_WRITE);
458 12282 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
459 12282 : cstate->copy_file) != 1 ||
460 12282 : ferror(cstate->copy_file))
461 : {
462 0 : if (cstate->is_program)
463 : {
464 0 : if (errno == EPIPE)
465 : {
466 : /*
467 : * The pipe will be closed automatically on error at
468 : * the end of transaction, but we might get a better
469 : * error message from the subprocess' exit code than
470 : * just "Broken Pipe"
471 : */
472 0 : ClosePipeToProgram(cstate);
473 :
474 : /*
475 : * If ClosePipeToProgram() didn't throw an error, the
476 : * program terminated normally, but closed the pipe
477 : * first. Restore errno, and throw an error.
478 : */
479 0 : errno = EPIPE;
480 : }
481 0 : ereport(ERROR,
482 : (errcode_for_file_access(),
483 : errmsg("could not write to COPY program: %m")));
484 : }
485 : else
486 0 : ereport(ERROR,
487 : (errcode_for_file_access(),
488 : errmsg("could not write to COPY file: %m")));
489 : }
490 12282 : pgstat_report_wait_end();
491 12282 : break;
492 3648940 : case COPY_FRONTEND:
493 : /* Dump the accumulated row as one CopyData message */
494 3648940 : (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
495 3648940 : break;
496 6 : case COPY_CALLBACK:
497 6 : cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
498 6 : break;
499 : }
500 :
501 : /* Update the progress */
502 3661228 : cstate->bytes_processed += fe_msgbuf->len;
503 3661228 : pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
504 :
505 3661228 : resetStringInfo(fe_msgbuf);
506 3661228 : }
507 :
508 : /*
509 : * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
510 : * line termination and do common appropriate things for the end of row.
511 : */
512 : static inline void
513 3661182 : CopySendTextLikeEndOfRow(CopyToState cstate)
514 : {
515 3661182 : switch (cstate->copy_dest)
516 : {
517 12258 : case COPY_FILE:
518 : /* Default line termination depends on platform */
519 : #ifndef WIN32
520 12258 : CopySendChar(cstate, '\n');
521 : #else
522 : CopySendString(cstate, "\r\n");
523 : #endif
524 12258 : break;
525 3648918 : case COPY_FRONTEND:
526 : /* The FE/BE protocol uses \n as newline for all platforms */
527 3648918 : CopySendChar(cstate, '\n');
528 3648918 : break;
529 6 : default:
530 6 : break;
531 : }
532 :
533 : /* Now take the actions related to the end of a row */
534 3661182 : CopySendEndOfRow(cstate);
535 3661182 : }
536 :
537 : /*
538 : * These functions do apply some data conversion
539 : */
540 :
541 : /*
542 : * CopySendInt32 sends an int32 in network byte order
543 : */
544 : static inline void
545 188 : CopySendInt32(CopyToState cstate, int32 val)
546 : {
547 : uint32 buf;
548 :
549 188 : buf = pg_hton32((uint32) val);
550 188 : CopySendData(cstate, &buf, sizeof(buf));
551 188 : }
552 :
553 : /*
554 : * CopySendInt16 sends an int16 in network byte order
555 : */
556 : static inline void
557 46 : CopySendInt16(CopyToState cstate, int16 val)
558 : {
559 : uint16 buf;
560 :
561 46 : buf = pg_hton16((uint16) val);
562 46 : CopySendData(cstate, &buf, sizeof(buf));
563 46 : }
564 :
565 : /*
566 : * Closes the pipe to an external program, checking the pclose() return code.
567 : */
568 : static void
569 0 : ClosePipeToProgram(CopyToState cstate)
570 : {
571 : int pclose_rc;
572 :
573 : Assert(cstate->is_program);
574 :
575 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
576 0 : if (pclose_rc == -1)
577 0 : ereport(ERROR,
578 : (errcode_for_file_access(),
579 : errmsg("could not close pipe to external command: %m")));
580 0 : else if (pclose_rc != 0)
581 : {
582 0 : ereport(ERROR,
583 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
584 : errmsg("program \"%s\" failed",
585 : cstate->filename),
586 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
587 : }
588 0 : }
589 :
590 : /*
591 : * Release resources allocated in a cstate for COPY TO.
592 : */
593 : static void
594 9222 : EndCopy(CopyToState cstate)
595 : {
596 9222 : if (cstate->is_program)
597 : {
598 0 : ClosePipeToProgram(cstate);
599 : }
600 : else
601 : {
602 9222 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
603 0 : ereport(ERROR,
604 : (errcode_for_file_access(),
605 : errmsg("could not close file \"%s\": %m",
606 : cstate->filename)));
607 : }
608 :
609 9222 : pgstat_progress_end_command();
610 :
611 9222 : MemoryContextDelete(cstate->copycontext);
612 :
613 9222 : if (cstate->partitions)
614 36 : list_free(cstate->partitions);
615 :
616 9222 : pfree(cstate);
617 9222 : }
618 :
619 : /*
620 : * Setup CopyToState to read tuples from a table or a query for COPY TO.
621 : *
622 : * 'rel': Relation to be copied
623 : * 'raw_query': Query whose results are to be copied
624 : * 'queryRelId': OID of base relation to convert to a query (for RLS)
625 : * 'filename': Name of server-local file to write, NULL for STDOUT
626 : * 'is_program': true if 'filename' is program to execute
627 : * 'data_dest_cb': Callback that processes the output data
628 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
629 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
630 : *
631 : * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
632 : */
633 : CopyToState
634 9438 : BeginCopyTo(ParseState *pstate,
635 : Relation rel,
636 : RawStmt *raw_query,
637 : Oid queryRelId,
638 : const char *filename,
639 : bool is_program,
640 : copy_data_dest_cb data_dest_cb,
641 : List *attnamelist,
642 : List *options)
643 : {
644 : CopyToState cstate;
645 9438 : bool pipe = (filename == NULL && data_dest_cb == NULL);
646 : TupleDesc tupDesc;
647 : int num_phys_attrs;
648 : MemoryContext oldcontext;
649 9438 : const int progress_cols[] = {
650 : PROGRESS_COPY_COMMAND,
651 : PROGRESS_COPY_TYPE
652 : };
653 9438 : int64 progress_vals[] = {
654 : PROGRESS_COPY_COMMAND_TO,
655 : 0
656 : };
657 9438 : List *children = NIL;
658 :
659 9438 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
660 : {
661 62 : if (rel->rd_rel->relkind == RELKIND_VIEW)
662 12 : ereport(ERROR,
663 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
664 : errmsg("cannot copy from view \"%s\"",
665 : RelationGetRelationName(rel)),
666 : errhint("Try the COPY (SELECT ...) TO variant.")));
667 50 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
668 : {
669 12 : if (!RelationIsPopulated(rel))
670 6 : ereport(ERROR,
671 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672 : errmsg("cannot copy from unpopulated materialized view \"%s\"",
673 : RelationGetRelationName(rel)),
674 : errhint("Use the REFRESH MATERIALIZED VIEW command."));
675 : }
676 38 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
677 0 : ereport(ERROR,
678 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
679 : errmsg("cannot copy from foreign table \"%s\"",
680 : RelationGetRelationName(rel)),
681 : errhint("Try the COPY (SELECT ...) TO variant.")));
682 38 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
683 0 : ereport(ERROR,
684 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
685 : errmsg("cannot copy from sequence \"%s\"",
686 : RelationGetRelationName(rel))));
687 38 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
688 : {
689 : /*
690 : * Collect OIDs of relation containing data, so that later
691 : * DoCopyTo can copy the data from them.
692 : */
693 38 : children = find_all_inheritors(RelationGetRelid(rel), AccessShareLock, NULL);
694 :
695 194 : foreach_oid(child, children)
696 : {
697 122 : char relkind = get_rel_relkind(child);
698 :
699 122 : if (relkind == RELKIND_FOREIGN_TABLE)
700 : {
701 2 : char *relation_name = get_rel_name(child);
702 :
703 2 : ereport(ERROR,
704 : errcode(ERRCODE_WRONG_OBJECT_TYPE),
705 : errmsg("cannot copy from foreign table \"%s\"", relation_name),
706 : errdetail("Partition \"%s\" is a foreign table in partitioned table \"%s\"",
707 : relation_name, RelationGetRelationName(rel)),
708 : errhint("Try the COPY (SELECT ...) TO variant."));
709 : }
710 :
711 : /* Exclude tables with no data */
712 120 : if (RELKIND_HAS_PARTITIONS(relkind))
713 56 : children = foreach_delete_current(children, child);
714 : }
715 : }
716 : else
717 0 : ereport(ERROR,
718 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
719 : errmsg("cannot copy from non-table relation \"%s\"",
720 : RelationGetRelationName(rel))));
721 : }
722 :
723 :
724 : /* Allocate workspace and zero all fields */
725 9418 : cstate = palloc0_object(CopyToStateData);
726 :
727 : /*
728 : * We allocate everything used by a cstate in a new memory context. This
729 : * avoids memory leaks during repeated use of COPY in a query.
730 : */
731 9418 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
732 : "COPY",
733 : ALLOCSET_DEFAULT_SIZES);
734 :
735 9418 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
736 :
737 : /* Extract options from the statement node tree */
738 9418 : ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
739 :
740 : /* Set format routine */
741 9358 : cstate->routine = CopyToGetRoutine(&cstate->opts);
742 :
743 : /* Process the source/target relation or query */
744 9358 : if (rel)
745 : {
746 : Assert(!raw_query);
747 :
748 8704 : cstate->rel = rel;
749 :
750 8704 : tupDesc = RelationGetDescr(cstate->rel);
751 8704 : cstate->partitions = children;
752 : }
753 : else
754 : {
755 : List *rewritten;
756 : Query *query;
757 : PlannedStmt *plan;
758 : DestReceiver *dest;
759 :
760 654 : cstate->rel = NULL;
761 654 : cstate->partitions = NIL;
762 :
763 : /*
764 : * Run parse analysis and rewrite. Note this also acquires sufficient
765 : * locks on the source table(s).
766 : */
767 654 : rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
768 : pstate->p_sourcetext, NULL, 0,
769 : NULL);
770 :
771 : /* check that we got back something we can work with */
772 642 : if (rewritten == NIL)
773 : {
774 18 : ereport(ERROR,
775 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
776 : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
777 : }
778 624 : else if (list_length(rewritten) > 1)
779 : {
780 : ListCell *lc;
781 :
782 : /* examine queries to determine which error message to issue */
783 102 : foreach(lc, rewritten)
784 : {
785 84 : Query *q = lfirst_node(Query, lc);
786 :
787 84 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
788 18 : ereport(ERROR,
789 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
790 : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
791 66 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
792 18 : ereport(ERROR,
793 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
794 : errmsg("DO ALSO rules are not supported for COPY")));
795 : }
796 :
797 18 : ereport(ERROR,
798 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
799 : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
800 : }
801 :
802 570 : query = linitial_node(Query, rewritten);
803 :
804 : /* The grammar allows SELECT INTO, but we don't support that */
805 570 : if (query->utilityStmt != NULL &&
806 18 : IsA(query->utilityStmt, CreateTableAsStmt))
807 12 : ereport(ERROR,
808 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
809 : errmsg("COPY (SELECT INTO) is not supported")));
810 :
811 : /* The only other utility command we could see is NOTIFY */
812 558 : if (query->utilityStmt != NULL)
813 6 : ereport(ERROR,
814 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
815 : errmsg("COPY query must not be a utility command")));
816 :
817 : /*
818 : * Similarly the grammar doesn't enforce the presence of a RETURNING
819 : * clause, but this is required here.
820 : */
821 552 : if (query->commandType != CMD_SELECT &&
822 110 : query->returningList == NIL)
823 : {
824 : Assert(query->commandType == CMD_INSERT ||
825 : query->commandType == CMD_UPDATE ||
826 : query->commandType == CMD_DELETE ||
827 : query->commandType == CMD_MERGE);
828 :
829 24 : ereport(ERROR,
830 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
831 : errmsg("COPY query must have a RETURNING clause")));
832 : }
833 :
834 : /* plan the query */
835 528 : plan = pg_plan_query(query, pstate->p_sourcetext,
836 : CURSOR_OPT_PARALLEL_OK, NULL, NULL);
837 :
838 : /*
839 : * With row-level security and a user using "COPY relation TO", we
840 : * have to convert the "COPY relation TO" to a query-based COPY (eg:
841 : * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
842 : * add in any RLS clauses.
843 : *
844 : * When this happens, we are passed in the relid of the originally
845 : * found relation (which we have locked). As the planner will look up
846 : * the relation again, we double-check here to make sure it found the
847 : * same one that we have locked.
848 : */
849 526 : if (queryRelId != InvalidOid)
850 : {
851 : /*
852 : * Note that with RLS involved there may be multiple relations,
853 : * and while the one we need is almost certainly first, we don't
854 : * make any guarantees of that in the planner, so check the whole
855 : * list and make sure we find the original relation.
856 : */
857 78 : if (!list_member_oid(plan->relationOids, queryRelId))
858 0 : ereport(ERROR,
859 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
860 : errmsg("relation referenced by COPY statement has changed")));
861 : }
862 :
863 : /*
864 : * Use a snapshot with an updated command ID to ensure this query sees
865 : * results of any previously executed queries.
866 : */
867 526 : PushCopiedSnapshot(GetActiveSnapshot());
868 526 : UpdateActiveSnapshotCommandId();
869 :
870 : /* Create dest receiver for COPY OUT */
871 526 : dest = CreateDestReceiver(DestCopyOut);
872 526 : ((DR_copy *) dest)->cstate = cstate;
873 :
874 : /* Create a QueryDesc requesting no output */
875 526 : cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
876 : GetActiveSnapshot(),
877 : InvalidSnapshot,
878 : dest, NULL, NULL, 0);
879 :
880 : /*
881 : * Call ExecutorStart to prepare the plan for execution.
882 : *
883 : * ExecutorStart computes a result tupdesc for us
884 : */
885 526 : ExecutorStart(cstate->queryDesc, 0);
886 :
887 520 : tupDesc = cstate->queryDesc->tupDesc;
888 : }
889 :
890 : /* Generate or convert list of attributes to process */
891 9224 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
892 :
893 9224 : num_phys_attrs = tupDesc->natts;
894 :
895 : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
896 9224 : cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
897 9224 : if (cstate->opts.force_quote_all)
898 : {
899 18 : MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool));
900 : }
901 9206 : else if (cstate->opts.force_quote)
902 : {
903 : List *attnums;
904 : ListCell *cur;
905 :
906 24 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
907 :
908 48 : foreach(cur, attnums)
909 : {
910 24 : int attnum = lfirst_int(cur);
911 24 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
912 :
913 24 : if (!list_member_int(cstate->attnumlist, attnum))
914 0 : ereport(ERROR,
915 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
916 : /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
917 : errmsg("%s column \"%s\" not referenced by COPY",
918 : "FORCE_QUOTE", NameStr(attr->attname))));
919 24 : cstate->opts.force_quote_flags[attnum - 1] = true;
920 : }
921 : }
922 :
923 : /* Use client encoding when ENCODING option is not specified. */
924 9224 : if (cstate->opts.file_encoding < 0)
925 9206 : cstate->file_encoding = pg_get_client_encoding();
926 : else
927 18 : cstate->file_encoding = cstate->opts.file_encoding;
928 :
929 : /*
930 : * Set up encoding conversion info if the file and server encodings differ
931 : * (see also pg_server_to_any).
932 : */
933 9224 : if (cstate->file_encoding == GetDatabaseEncoding() ||
934 8 : cstate->file_encoding == PG_SQL_ASCII)
935 9222 : cstate->need_transcoding = false;
936 : else
937 2 : cstate->need_transcoding = true;
938 :
939 : /* See Multibyte encoding comment above */
940 9224 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
941 :
942 9224 : cstate->copy_dest = COPY_FILE; /* default */
943 :
944 9224 : if (data_dest_cb)
945 : {
946 2 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
947 2 : cstate->copy_dest = COPY_CALLBACK;
948 2 : cstate->data_dest_cb = data_dest_cb;
949 : }
950 9222 : else if (pipe)
951 : {
952 9160 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
953 :
954 : Assert(!is_program); /* the grammar does not allow this */
955 9160 : if (whereToSendOutput != DestRemote)
956 0 : cstate->copy_file = stdout;
957 : }
958 : else
959 : {
960 62 : cstate->filename = pstrdup(filename);
961 62 : cstate->is_program = is_program;
962 :
963 62 : if (is_program)
964 : {
965 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
966 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
967 0 : if (cstate->copy_file == NULL)
968 0 : ereport(ERROR,
969 : (errcode_for_file_access(),
970 : errmsg("could not execute command \"%s\": %m",
971 : cstate->filename)));
972 : }
973 : else
974 : {
975 : mode_t oumask; /* Pre-existing umask value */
976 : struct stat st;
977 :
978 62 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
979 :
980 : /*
981 : * Prevent write to relative path ... too easy to shoot oneself in
982 : * the foot by overwriting a database file ...
983 : */
984 62 : if (!is_absolute_path(filename))
985 0 : ereport(ERROR,
986 : (errcode(ERRCODE_INVALID_NAME),
987 : errmsg("relative path not allowed for COPY to file")));
988 :
989 62 : oumask = umask(S_IWGRP | S_IWOTH);
990 62 : PG_TRY();
991 : {
992 62 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
993 : }
994 0 : PG_FINALLY();
995 : {
996 62 : umask(oumask);
997 : }
998 62 : PG_END_TRY();
999 62 : if (cstate->copy_file == NULL)
1000 : {
1001 : /* copy errno because ereport subfunctions might change it */
1002 0 : int save_errno = errno;
1003 :
1004 0 : ereport(ERROR,
1005 : (errcode_for_file_access(),
1006 : errmsg("could not open file \"%s\" for writing: %m",
1007 : cstate->filename),
1008 : (save_errno == ENOENT || save_errno == EACCES) ?
1009 : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1010 : "You may want a client-side facility such as psql's \\copy.") : 0));
1011 : }
1012 :
1013 62 : if (fstat(fileno(cstate->copy_file), &st))
1014 0 : ereport(ERROR,
1015 : (errcode_for_file_access(),
1016 : errmsg("could not stat file \"%s\": %m",
1017 : cstate->filename)));
1018 :
1019 62 : if (S_ISDIR(st.st_mode))
1020 0 : ereport(ERROR,
1021 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1022 : errmsg("\"%s\" is a directory", cstate->filename)));
1023 : }
1024 : }
1025 :
1026 : /* initialize progress */
1027 9224 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1028 9224 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1029 9224 : pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
1030 :
1031 9224 : cstate->bytes_processed = 0;
1032 :
1033 9224 : MemoryContextSwitchTo(oldcontext);
1034 :
1035 9224 : return cstate;
1036 : }
1037 :
1038 : /*
1039 : * Clean up storage and release resources for COPY TO.
1040 : */
1041 : void
1042 9222 : EndCopyTo(CopyToState cstate)
1043 : {
1044 9222 : if (cstate->queryDesc != NULL)
1045 : {
1046 : /* Close down the query and free resources. */
1047 520 : ExecutorFinish(cstate->queryDesc);
1048 520 : ExecutorEnd(cstate->queryDesc);
1049 520 : FreeQueryDesc(cstate->queryDesc);
1050 520 : PopActiveSnapshot();
1051 : }
1052 :
1053 : /* Clean up storage */
1054 9222 : EndCopy(cstate);
1055 9222 : }
1056 :
1057 : /*
1058 : * Copy from relation or query TO file.
1059 : *
1060 : * Returns the number of rows processed.
1061 : */
1062 : uint64
1063 9224 : DoCopyTo(CopyToState cstate)
1064 : {
1065 9224 : bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
1066 9224 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1067 : TupleDesc tupDesc;
1068 : int num_phys_attrs;
1069 : ListCell *cur;
1070 9224 : uint64 processed = 0;
1071 :
1072 9224 : if (fe_copy)
1073 9160 : SendCopyBegin(cstate);
1074 :
1075 9224 : if (cstate->rel)
1076 8704 : tupDesc = RelationGetDescr(cstate->rel);
1077 : else
1078 520 : tupDesc = cstate->queryDesc->tupDesc;
1079 9224 : num_phys_attrs = tupDesc->natts;
1080 9224 : cstate->opts.null_print_client = cstate->opts.null_print; /* default */
1081 :
1082 : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1083 9224 : cstate->fe_msgbuf = makeStringInfo();
1084 :
1085 : /* Get info about the columns we need to process. */
1086 9224 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1087 41336 : foreach(cur, cstate->attnumlist)
1088 : {
1089 32114 : int attnum = lfirst_int(cur);
1090 32114 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1091 :
1092 32114 : cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
1093 32114 : &cstate->out_functions[attnum - 1]);
1094 : }
1095 :
1096 : /*
1097 : * Create a temporary memory context that we can reset once per row to
1098 : * recover palloc'd memory. This avoids any problems with leaks inside
1099 : * datatype output routines, and should be faster than retail pfree's
1100 : * anyway. (We don't need a whole econtext as CopyFrom does.)
1101 : */
1102 9222 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1103 : "COPY TO",
1104 : ALLOCSET_DEFAULT_SIZES);
1105 :
1106 9222 : cstate->routine->CopyToStart(cstate, tupDesc);
1107 :
1108 9222 : if (cstate->rel)
1109 : {
1110 : /*
1111 : * If COPY TO source table is a partitioned table, then open each
1112 : * partition and process each individual partition.
1113 : */
1114 8702 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1115 : {
1116 136 : foreach_oid(child, cstate->partitions)
1117 : {
1118 : Relation scan_rel;
1119 :
1120 : /* We already got the lock in BeginCopyTo */
1121 64 : scan_rel = table_open(child, NoLock);
1122 64 : CopyRelationTo(cstate, scan_rel, cstate->rel, &processed);
1123 64 : table_close(scan_rel, NoLock);
1124 : }
1125 : }
1126 : else
1127 8666 : CopyRelationTo(cstate, cstate->rel, NULL, &processed);
1128 : }
1129 : else
1130 : {
1131 : /* run the plan --- the dest receiver will send tuples */
1132 520 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
1133 520 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1134 : }
1135 :
1136 9222 : cstate->routine->CopyToEnd(cstate);
1137 :
1138 9222 : MemoryContextDelete(cstate->rowcontext);
1139 :
1140 9222 : if (fe_copy)
1141 9158 : SendCopyEnd(cstate);
1142 :
1143 9222 : return processed;
1144 : }
1145 :
1146 : /*
1147 : * Scans a single table and exports its rows to the COPY destination.
1148 : *
1149 : * root_rel can be set to the root table of rel if rel is a partition
1150 : * table so that we can send tuples in root_rel's rowtype, which might
1151 : * differ from individual partitions.
1152 : */
1153 : static void
1154 8730 : CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel, uint64 *processed)
1155 : {
1156 : TupleTableSlot *slot;
1157 : TableScanDesc scandesc;
1158 8730 : AttrMap *map = NULL;
1159 8730 : TupleTableSlot *root_slot = NULL;
1160 :
1161 8730 : scandesc = table_beginscan(rel, GetActiveSnapshot(), 0, NULL);
1162 8730 : slot = table_slot_create(rel, NULL);
1163 :
1164 : /*
1165 : * If we are exporting partition data here, we check if converting tuples
1166 : * to the root table's rowtype, because a partition might have column
1167 : * order different than its root table.
1168 : */
1169 8730 : if (root_rel != NULL)
1170 : {
1171 64 : root_slot = table_slot_create(root_rel, NULL);
1172 64 : map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
1173 : RelationGetDescr(rel),
1174 : false);
1175 : }
1176 :
1177 3662780 : while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
1178 : {
1179 : TupleTableSlot *copyslot;
1180 :
1181 3654050 : CHECK_FOR_INTERRUPTS();
1182 :
1183 3654050 : if (map != NULL)
1184 28 : copyslot = execute_attr_map_slot(map, slot, root_slot);
1185 : else
1186 : {
1187 : /* Deconstruct the tuple */
1188 3654022 : slot_getallattrs(slot);
1189 3654022 : copyslot = slot;
1190 : }
1191 :
1192 : /* Format and send the data */
1193 3654050 : CopyOneRowTo(cstate, copyslot);
1194 :
1195 : /*
1196 : * Increment the number of processed tuples, and report the progress.
1197 : */
1198 3654050 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1199 3654050 : ++(*processed));
1200 : }
1201 :
1202 8730 : ExecDropSingleTupleTableSlot(slot);
1203 :
1204 8730 : if (root_slot != NULL)
1205 64 : ExecDropSingleTupleTableSlot(root_slot);
1206 :
1207 8730 : if (map != NULL)
1208 12 : free_attrmap(map);
1209 :
1210 8730 : table_endscan(scandesc);
1211 8730 : }
1212 :
1213 : /*
1214 : * Emit one row during DoCopyTo().
1215 : */
1216 : static inline void
1217 3661178 : CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
1218 : {
1219 : MemoryContext oldcontext;
1220 :
1221 3661178 : MemoryContextReset(cstate->rowcontext);
1222 3661178 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1223 :
1224 : /* Make sure the tuple is fully deconstructed */
1225 3661178 : slot_getallattrs(slot);
1226 :
1227 3661178 : cstate->routine->CopyToOneRow(cstate, slot);
1228 :
1229 3661178 : MemoryContextSwitchTo(oldcontext);
1230 3661178 : }
1231 :
1232 : /*
1233 : * Send text representation of one attribute, with conversion and escaping
1234 : */
1235 : #define DUMPSOFAR() \
1236 : do { \
1237 : if (ptr > start) \
1238 : CopySendData(cstate, start, ptr - start); \
1239 : } while (0)
1240 :
1241 : static void
1242 12965016 : CopyAttributeOutText(CopyToState cstate, const char *string)
1243 : {
1244 : const char *ptr;
1245 : const char *start;
1246 : char c;
1247 12965016 : char delimc = cstate->opts.delim[0];
1248 :
1249 12965016 : if (cstate->need_transcoding)
1250 0 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1251 : else
1252 12965016 : ptr = string;
1253 :
1254 : /*
1255 : * We have to grovel through the string searching for control characters
1256 : * and instances of the delimiter character. In most cases, though, these
1257 : * are infrequent. To avoid overhead from calling CopySendData once per
1258 : * character, we dump out all characters between escaped characters in a
1259 : * single call. The loop invariant is that the data from "start" to "ptr"
1260 : * can be sent literally, but hasn't yet been.
1261 : *
1262 : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1263 : * in valid backend encodings, extra bytes of a multibyte character never
1264 : * look like ASCII. This loop is sufficiently performance-critical that
1265 : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1266 : * of the normal safe-encoding path.
1267 : */
1268 12965016 : if (cstate->encoding_embeds_ascii)
1269 : {
1270 0 : start = ptr;
1271 0 : while ((c = *ptr) != '\0')
1272 : {
1273 0 : if ((unsigned char) c < (unsigned char) 0x20)
1274 : {
1275 : /*
1276 : * \r and \n must be escaped, the others are traditional. We
1277 : * prefer to dump these using the C-like notation, rather than
1278 : * a backslash and the literal character, because it makes the
1279 : * dump file a bit more proof against Microsoftish data
1280 : * mangling.
1281 : */
1282 0 : switch (c)
1283 : {
1284 0 : case '\b':
1285 0 : c = 'b';
1286 0 : break;
1287 0 : case '\f':
1288 0 : c = 'f';
1289 0 : break;
1290 0 : case '\n':
1291 0 : c = 'n';
1292 0 : break;
1293 0 : case '\r':
1294 0 : c = 'r';
1295 0 : break;
1296 0 : case '\t':
1297 0 : c = 't';
1298 0 : break;
1299 0 : case '\v':
1300 0 : c = 'v';
1301 0 : break;
1302 0 : default:
1303 : /* If it's the delimiter, must backslash it */
1304 0 : if (c == delimc)
1305 0 : break;
1306 : /* All ASCII control chars are length 1 */
1307 0 : ptr++;
1308 0 : continue; /* fall to end of loop */
1309 : }
1310 : /* if we get here, we need to convert the control char */
1311 0 : DUMPSOFAR();
1312 0 : CopySendChar(cstate, '\\');
1313 0 : CopySendChar(cstate, c);
1314 0 : start = ++ptr; /* do not include char in next run */
1315 : }
1316 0 : else if (c == '\\' || c == delimc)
1317 : {
1318 0 : DUMPSOFAR();
1319 0 : CopySendChar(cstate, '\\');
1320 0 : start = ptr++; /* we include char in next run */
1321 : }
1322 0 : else if (IS_HIGHBIT_SET(c))
1323 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1324 : else
1325 0 : ptr++;
1326 : }
1327 : }
1328 : else
1329 : {
1330 12965016 : start = ptr;
1331 140564662 : while ((c = *ptr) != '\0')
1332 : {
1333 127599646 : if ((unsigned char) c < (unsigned char) 0x20)
1334 : {
1335 : /*
1336 : * \r and \n must be escaped, the others are traditional. We
1337 : * prefer to dump these using the C-like notation, rather than
1338 : * a backslash and the literal character, because it makes the
1339 : * dump file a bit more proof against Microsoftish data
1340 : * mangling.
1341 : */
1342 13950 : switch (c)
1343 : {
1344 0 : case '\b':
1345 0 : c = 'b';
1346 0 : break;
1347 0 : case '\f':
1348 0 : c = 'f';
1349 0 : break;
1350 11804 : case '\n':
1351 11804 : c = 'n';
1352 11804 : break;
1353 0 : case '\r':
1354 0 : c = 'r';
1355 0 : break;
1356 2146 : case '\t':
1357 2146 : c = 't';
1358 2146 : break;
1359 0 : case '\v':
1360 0 : c = 'v';
1361 0 : break;
1362 0 : default:
1363 : /* If it's the delimiter, must backslash it */
1364 0 : if (c == delimc)
1365 0 : break;
1366 : /* All ASCII control chars are length 1 */
1367 0 : ptr++;
1368 0 : continue; /* fall to end of loop */
1369 : }
1370 : /* if we get here, we need to convert the control char */
1371 13950 : DUMPSOFAR();
1372 13950 : CopySendChar(cstate, '\\');
1373 13950 : CopySendChar(cstate, c);
1374 13950 : start = ++ptr; /* do not include char in next run */
1375 : }
1376 127585696 : else if (c == '\\' || c == delimc)
1377 : {
1378 4590 : DUMPSOFAR();
1379 4590 : CopySendChar(cstate, '\\');
1380 4590 : start = ptr++; /* we include char in next run */
1381 : }
1382 : else
1383 127581106 : ptr++;
1384 : }
1385 : }
1386 :
1387 12965016 : DUMPSOFAR();
1388 12965016 : }
1389 :
1390 : /*
1391 : * Send text representation of one attribute, with conversion and
1392 : * CSV-style escaping
1393 : */
1394 : static void
1395 618 : CopyAttributeOutCSV(CopyToState cstate, const char *string,
1396 : bool use_quote)
1397 : {
1398 : const char *ptr;
1399 : const char *start;
1400 : char c;
1401 618 : char delimc = cstate->opts.delim[0];
1402 618 : char quotec = cstate->opts.quote[0];
1403 618 : char escapec = cstate->opts.escape[0];
1404 618 : bool single_attr = (list_length(cstate->attnumlist) == 1);
1405 :
1406 : /* force quoting if it matches null_print (before conversion!) */
1407 618 : if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1408 54 : use_quote = true;
1409 :
1410 618 : if (cstate->need_transcoding)
1411 0 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1412 : else
1413 618 : ptr = string;
1414 :
1415 : /*
1416 : * Make a preliminary pass to discover if it needs quoting
1417 : */
1418 618 : if (!use_quote)
1419 : {
1420 : /*
1421 : * Quote '\.' if it appears alone on a line, so that it will not be
1422 : * interpreted as an end-of-data marker. (PG 18 and up will not
1423 : * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1424 : * we want the data to be loadable by older versions too. Also, this
1425 : * avoids breaking clients that are still using PQgetline().)
1426 : */
1427 432 : if (single_attr && strcmp(ptr, "\\.") == 0)
1428 6 : use_quote = true;
1429 : else
1430 : {
1431 426 : const char *tptr = ptr;
1432 :
1433 2208 : while ((c = *tptr) != '\0')
1434 : {
1435 1914 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
1436 : {
1437 132 : use_quote = true;
1438 132 : break;
1439 : }
1440 1782 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1441 0 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1442 : else
1443 1782 : tptr++;
1444 : }
1445 : }
1446 : }
1447 :
1448 618 : if (use_quote)
1449 : {
1450 324 : CopySendChar(cstate, quotec);
1451 :
1452 : /*
1453 : * We adopt the same optimization strategy as in CopyAttributeOutText
1454 : */
1455 324 : start = ptr;
1456 2538 : while ((c = *ptr) != '\0')
1457 : {
1458 2214 : if (c == quotec || c == escapec)
1459 : {
1460 156 : DUMPSOFAR();
1461 156 : CopySendChar(cstate, escapec);
1462 156 : start = ptr; /* we include char in next run */
1463 : }
1464 2214 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1465 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1466 : else
1467 2214 : ptr++;
1468 : }
1469 324 : DUMPSOFAR();
1470 :
1471 324 : CopySendChar(cstate, quotec);
1472 : }
1473 : else
1474 : {
1475 : /* If it doesn't need quoting, we can just dump it as-is */
1476 294 : CopySendString(cstate, ptr);
1477 : }
1478 618 : }
1479 :
1480 : /*
1481 : * copy_dest_startup --- executor startup
1482 : */
1483 : static void
1484 520 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1485 : {
1486 : /* no-op */
1487 520 : }
1488 :
1489 : /*
1490 : * copy_dest_receive --- receive one tuple
1491 : */
1492 : static bool
1493 7128 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
1494 : {
1495 7128 : DR_copy *myState = (DR_copy *) self;
1496 7128 : CopyToState cstate = myState->cstate;
1497 :
1498 : /* Send the data */
1499 7128 : CopyOneRowTo(cstate, slot);
1500 :
1501 : /* Increment the number of processed tuples, and report the progress */
1502 7128 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1503 7128 : ++myState->processed);
1504 :
1505 7128 : return true;
1506 : }
1507 :
1508 : /*
1509 : * copy_dest_shutdown --- executor end
1510 : */
1511 : static void
1512 520 : copy_dest_shutdown(DestReceiver *self)
1513 : {
1514 : /* no-op */
1515 520 : }
1516 :
1517 : /*
1518 : * copy_dest_destroy --- release DestReceiver object
1519 : */
1520 : static void
1521 0 : copy_dest_destroy(DestReceiver *self)
1522 : {
1523 0 : pfree(self);
1524 0 : }
1525 :
1526 : /*
1527 : * CreateCopyDestReceiver -- create a suitable DestReceiver object
1528 : */
1529 : DestReceiver *
1530 526 : CreateCopyDestReceiver(void)
1531 : {
1532 526 : DR_copy *self = palloc_object(DR_copy);
1533 :
1534 526 : self->pub.receiveSlot = copy_dest_receive;
1535 526 : self->pub.rStartup = copy_dest_startup;
1536 526 : self->pub.rShutdown = copy_dest_shutdown;
1537 526 : self->pub.rDestroy = copy_dest_destroy;
1538 526 : self->pub.mydest = DestCopyOut;
1539 :
1540 526 : self->cstate = NULL; /* will be set later */
1541 526 : self->processed = 0;
1542 :
1543 526 : return (DestReceiver *) self;
1544 : }
|