Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyto.c
4 : * COPY <table> TO file/program/client
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/copyto.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <ctype.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 :
21 : #include "access/table.h"
22 : #include "access/tableam.h"
23 : #include "catalog/pg_inherits.h"
24 : #include "commands/copyapi.h"
25 : #include "commands/progress.h"
26 : #include "executor/execdesc.h"
27 : #include "executor/executor.h"
28 : #include "executor/tuptable.h"
29 : #include "libpq/libpq.h"
30 : #include "libpq/pqformat.h"
31 : #include "mb/pg_wchar.h"
32 : #include "miscadmin.h"
33 : #include "pgstat.h"
34 : #include "storage/fd.h"
35 : #include "tcop/tcopprot.h"
36 : #include "utils/lsyscache.h"
37 : #include "utils/memutils.h"
38 : #include "utils/rel.h"
39 : #include "utils/snapmgr.h"
40 :
41 : /*
42 : * Represents the different dest cases we need to worry about at
43 : * the bottom level
44 : */
45 : typedef enum CopyDest
46 : {
47 : COPY_FILE, /* to file (or a piped program) */
48 : COPY_FRONTEND, /* to frontend */
49 : COPY_CALLBACK, /* to callback function */
50 : } CopyDest;
51 :
52 : /*
53 : * This struct contains all the state variables used throughout a COPY TO
54 : * operation.
55 : *
56 : * Multi-byte encodings: all supported client-side encodings encode multi-byte
57 : * characters by having the first byte's high bit set. Subsequent bytes of the
58 : * character can have the high bit not set. When scanning data in such an
59 : * encoding to look for a match to a single-byte (ie ASCII) character, we must
60 : * use the full pg_encoding_mblen() machinery to skip over multibyte
61 : * characters, else we might find a false match to a trailing byte. In
62 : * supported server encodings, there is no possibility of a false match, and
63 : * it's faster to make useless comparisons to trailing bytes than it is to
64 : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
65 : * when we have to do it the hard way.
66 : */
67 : typedef struct CopyToStateData
68 : {
69 : /* format-specific routines */
70 : const CopyToRoutine *routine;
71 :
72 : /* low-level state data */
73 : CopyDest copy_dest; /* type of copy source/destination */
74 : FILE *copy_file; /* used if copy_dest == COPY_FILE */
75 : StringInfo fe_msgbuf; /* used for all dests during COPY TO */
76 :
77 : int file_encoding; /* file or remote side's character encoding */
78 : bool need_transcoding; /* file encoding diff from server? */
79 : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
80 :
81 : /* parameters from the COPY command */
82 : Relation rel; /* relation to copy to */
83 : QueryDesc *queryDesc; /* executable query to copy from */
84 : List *attnumlist; /* integer list of attnums to copy */
85 : char *filename; /* filename, or NULL for STDOUT */
86 : bool is_program; /* is 'filename' a program to popen? */
87 : copy_data_dest_cb data_dest_cb; /* function for writing data */
88 :
89 : CopyFormatOptions opts;
90 : Node *whereClause; /* WHERE condition (or NULL) */
91 : List *partitions; /* OID list of partitions to copy data from */
92 :
93 : /*
94 : * Working state
95 : */
96 : MemoryContext copycontext; /* per-copy execution context */
97 :
98 : FmgrInfo *out_functions; /* lookup info for output functions */
99 : MemoryContext rowcontext; /* per-row evaluation context */
100 : uint64 bytes_processed; /* number of bytes processed so far */
101 : } CopyToStateData;
102 :
103 : /* DestReceiver for COPY (query) TO */
104 : typedef struct
105 : {
106 : DestReceiver pub; /* publicly-known function pointers */
107 : CopyToState cstate; /* CopyToStateData for the command */
108 : uint64 processed; /* # of tuples processed */
109 : } DR_copy;
110 :
111 : /* NOTE: there's a copy of this in copyfromparse.c */
112 : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
113 :
114 :
115 : /* non-export function prototypes */
116 : static void EndCopy(CopyToState cstate);
117 : static void ClosePipeToProgram(CopyToState cstate);
118 : static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
119 : static void CopyAttributeOutText(CopyToState cstate, const char *string);
120 : static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
121 : bool use_quote);
122 : static void CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel,
123 : uint64 *processed);
124 :
125 : /* built-in format-specific routines */
126 : static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
127 : static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
128 : static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
129 : static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
130 : static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
131 : bool is_csv);
132 : static void CopyToTextLikeEnd(CopyToState cstate);
133 : static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
134 : static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
135 : static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
136 : static void CopyToBinaryEnd(CopyToState cstate);
137 :
138 : /* Low-level communications functions */
139 : static void SendCopyBegin(CopyToState cstate);
140 : static void SendCopyEnd(CopyToState cstate);
141 : static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
142 : static void CopySendString(CopyToState cstate, const char *str);
143 : static void CopySendChar(CopyToState cstate, char c);
144 : static void CopySendEndOfRow(CopyToState cstate);
145 : static void CopySendTextLikeEndOfRow(CopyToState cstate);
146 : static void CopySendInt32(CopyToState cstate, int32 val);
147 : static void CopySendInt16(CopyToState cstate, int16 val);
148 :
149 : /*
150 : * COPY TO routines for built-in formats.
151 : *
152 : * CSV and text formats share the same TextLike routines except for the
153 : * one-row callback.
154 : */
155 :
156 : /* text format */
157 : static const CopyToRoutine CopyToRoutineText = {
158 : .CopyToStart = CopyToTextLikeStart,
159 : .CopyToOutFunc = CopyToTextLikeOutFunc,
160 : .CopyToOneRow = CopyToTextOneRow,
161 : .CopyToEnd = CopyToTextLikeEnd,
162 : };
163 :
164 : /* CSV format */
165 : static const CopyToRoutine CopyToRoutineCSV = {
166 : .CopyToStart = CopyToTextLikeStart,
167 : .CopyToOutFunc = CopyToTextLikeOutFunc,
168 : .CopyToOneRow = CopyToCSVOneRow,
169 : .CopyToEnd = CopyToTextLikeEnd,
170 : };
171 :
172 : /* binary format */
173 : static const CopyToRoutine CopyToRoutineBinary = {
174 : .CopyToStart = CopyToBinaryStart,
175 : .CopyToOutFunc = CopyToBinaryOutFunc,
176 : .CopyToOneRow = CopyToBinaryOneRow,
177 : .CopyToEnd = CopyToBinaryEnd,
178 : };
179 :
180 : /* Return a COPY TO routine for the given options */
181 : static const CopyToRoutine *
182 9264 : CopyToGetRoutine(const CopyFormatOptions *opts)
183 : {
184 9264 : if (opts->csv_mode)
185 126 : return &CopyToRoutineCSV;
186 9138 : else if (opts->binary)
187 16 : return &CopyToRoutineBinary;
188 :
189 : /* default is text */
190 9122 : return &CopyToRoutineText;
191 : }
192 :
193 : /* Implementation of the start callback for text and CSV formats */
194 : static void
195 9114 : CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
196 : {
197 : /*
198 : * For non-binary copy, we need to convert null_print to file encoding,
199 : * because it will be sent directly with CopySendString.
200 : */
201 9114 : if (cstate->need_transcoding)
202 2 : cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
203 : cstate->opts.null_print_len,
204 : cstate->file_encoding);
205 :
206 : /* if a header has been requested send the line */
207 9114 : if (cstate->opts.header_line == COPY_HEADER_TRUE)
208 : {
209 : ListCell *cur;
210 36 : bool hdr_delim = false;
211 :
212 96 : foreach(cur, cstate->attnumlist)
213 : {
214 60 : int attnum = lfirst_int(cur);
215 : char *colname;
216 :
217 60 : if (hdr_delim)
218 24 : CopySendChar(cstate, cstate->opts.delim[0]);
219 60 : hdr_delim = true;
220 :
221 60 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
222 :
223 60 : if (cstate->opts.csv_mode)
224 24 : CopyAttributeOutCSV(cstate, colname, false);
225 : else
226 36 : CopyAttributeOutText(cstate, colname);
227 : }
228 :
229 36 : CopySendTextLikeEndOfRow(cstate);
230 : }
231 9114 : }
232 :
233 : /*
234 : * Implementation of the outfunc callback for text and CSV formats. Assign
235 : * the output function data to the given *finfo.
236 : */
237 : static void
238 31824 : CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
239 : {
240 : Oid func_oid;
241 : bool is_varlena;
242 :
243 : /* Set output function for an attribute */
244 31824 : getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
245 31824 : fmgr_info(func_oid, finfo);
246 31824 : }
247 :
248 : /* Implementation of the per-row callback for text format */
249 : static void
250 3653376 : CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
251 : {
252 3653376 : CopyToTextLikeOneRow(cstate, slot, false);
253 3653376 : }
254 :
255 : /* Implementation of the per-row callback for CSV format */
256 : static void
257 330 : CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
258 : {
259 330 : CopyToTextLikeOneRow(cstate, slot, true);
260 330 : }
261 :
262 : /*
263 : * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
264 : *
265 : * We use pg_attribute_always_inline to reduce function call overhead
266 : * and to help compilers to optimize away the 'is_csv' condition.
267 : */
268 : static pg_attribute_always_inline void
269 3653706 : CopyToTextLikeOneRow(CopyToState cstate,
270 : TupleTableSlot *slot,
271 : bool is_csv)
272 : {
273 3653706 : bool need_delim = false;
274 3653706 : FmgrInfo *out_functions = cstate->out_functions;
275 :
276 21430484 : foreach_int(attnum, cstate->attnumlist)
277 : {
278 14123072 : Datum value = slot->tts_values[attnum - 1];
279 14123072 : bool isnull = slot->tts_isnull[attnum - 1];
280 :
281 14123072 : if (need_delim)
282 10469496 : CopySendChar(cstate, cstate->opts.delim[0]);
283 14123072 : need_delim = true;
284 :
285 14123072 : if (isnull)
286 : {
287 1187090 : CopySendString(cstate, cstate->opts.null_print_client);
288 : }
289 : else
290 : {
291 : char *string;
292 :
293 12935982 : string = OutputFunctionCall(&out_functions[attnum - 1],
294 : value);
295 :
296 12935982 : if (is_csv)
297 594 : CopyAttributeOutCSV(cstate, string,
298 594 : cstate->opts.force_quote_flags[attnum - 1]);
299 : else
300 12935388 : CopyAttributeOutText(cstate, string);
301 : }
302 : }
303 :
304 3653706 : CopySendTextLikeEndOfRow(cstate);
305 3653706 : }
306 :
307 : /* Implementation of the end callback for text and CSV formats */
308 : static void
309 9114 : CopyToTextLikeEnd(CopyToState cstate)
310 : {
311 : /* Nothing to do here */
312 9114 : }
313 :
314 : /*
315 : * Implementation of the start callback for binary format. Send a header
316 : * for a binary copy.
317 : */
318 : static void
319 14 : CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
320 : {
321 : int32 tmp;
322 :
323 : /* Signature */
324 14 : CopySendData(cstate, BinarySignature, 11);
325 : /* Flags field */
326 14 : tmp = 0;
327 14 : CopySendInt32(cstate, tmp);
328 : /* No header extension */
329 14 : tmp = 0;
330 14 : CopySendInt32(cstate, tmp);
331 14 : }
332 :
333 : /*
334 : * Implementation of the outfunc callback for binary format. Assign
335 : * the binary output function to the given *finfo.
336 : */
337 : static void
338 62 : CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
339 : {
340 : Oid func_oid;
341 : bool is_varlena;
342 :
343 : /* Set output function for an attribute */
344 62 : getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
345 60 : fmgr_info(func_oid, finfo);
346 60 : }
347 :
348 : /* Implementation of the per-row callback for binary format */
349 : static void
350 32 : CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
351 : {
352 32 : FmgrInfo *out_functions = cstate->out_functions;
353 :
354 : /* Binary per-tuple header */
355 32 : CopySendInt16(cstate, list_length(cstate->attnumlist));
356 :
357 224 : foreach_int(attnum, cstate->attnumlist)
358 : {
359 160 : Datum value = slot->tts_values[attnum - 1];
360 160 : bool isnull = slot->tts_isnull[attnum - 1];
361 :
362 160 : if (isnull)
363 : {
364 30 : CopySendInt32(cstate, -1);
365 : }
366 : else
367 : {
368 : bytea *outputbytes;
369 :
370 130 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
371 : value);
372 130 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
373 130 : CopySendData(cstate, VARDATA(outputbytes),
374 130 : VARSIZE(outputbytes) - VARHDRSZ);
375 : }
376 : }
377 :
378 32 : CopySendEndOfRow(cstate);
379 32 : }
380 :
381 : /* Implementation of the end callback for binary format */
382 : static void
383 14 : CopyToBinaryEnd(CopyToState cstate)
384 : {
385 : /* Generate trailer for a binary copy */
386 14 : CopySendInt16(cstate, -1);
387 : /* Need to flush out the trailer */
388 14 : CopySendEndOfRow(cstate);
389 14 : }
390 :
391 : /*
392 : * Send copy start/stop messages for frontend copies. These have changed
393 : * in past protocol redesigns.
394 : */
395 : static void
396 9066 : SendCopyBegin(CopyToState cstate)
397 : {
398 : StringInfoData buf;
399 9066 : int natts = list_length(cstate->attnumlist);
400 9066 : int16 format = (cstate->opts.binary ? 1 : 0);
401 : int i;
402 :
403 9066 : pq_beginmessage(&buf, PqMsg_CopyOutResponse);
404 9066 : pq_sendbyte(&buf, format); /* overall format */
405 9066 : pq_sendint16(&buf, natts);
406 40720 : for (i = 0; i < natts; i++)
407 31654 : pq_sendint16(&buf, format); /* per-column formats */
408 9066 : pq_endmessage(&buf);
409 9066 : cstate->copy_dest = COPY_FRONTEND;
410 9066 : }
411 :
412 : static void
413 9064 : SendCopyEnd(CopyToState cstate)
414 : {
415 : /* Shouldn't have any unsent data */
416 : Assert(cstate->fe_msgbuf->len == 0);
417 : /* Send Copy Done message */
418 9064 : pq_putemptymessage(PqMsg_CopyDone);
419 9064 : }
420 :
421 : /*----------
422 : * CopySendData sends output data to the destination (file or frontend)
423 : * CopySendString does the same for null-terminated strings
424 : * CopySendChar does the same for single characters
425 : * CopySendEndOfRow does the appropriate thing at end of each data row
426 : * (data is not actually flushed except by CopySendEndOfRow)
427 : *
428 : * NB: no data conversion is applied by these functions
429 : *----------
430 : */
431 : static void
432 12737188 : CopySendData(CopyToState cstate, const void *databuf, int datasize)
433 : {
434 12737188 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
435 12737188 : }
436 :
437 : static void
438 1187384 : CopySendString(CopyToState cstate, const char *str)
439 : {
440 1187384 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
441 1187384 : }
442 :
443 : static void
444 14156488 : CopySendChar(CopyToState cstate, char c)
445 : {
446 14156488 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
447 14156488 : }
448 :
449 : static void
450 3653788 : CopySendEndOfRow(CopyToState cstate)
451 : {
452 3653788 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
453 :
454 3653788 : switch (cstate->copy_dest)
455 : {
456 12282 : case COPY_FILE:
457 12282 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
458 12282 : cstate->copy_file) != 1 ||
459 12282 : ferror(cstate->copy_file))
460 : {
461 0 : if (cstate->is_program)
462 : {
463 0 : if (errno == EPIPE)
464 : {
465 : /*
466 : * The pipe will be closed automatically on error at
467 : * the end of transaction, but we might get a better
468 : * error message from the subprocess' exit code than
469 : * just "Broken Pipe"
470 : */
471 0 : ClosePipeToProgram(cstate);
472 :
473 : /*
474 : * If ClosePipeToProgram() didn't throw an error, the
475 : * program terminated normally, but closed the pipe
476 : * first. Restore errno, and throw an error.
477 : */
478 0 : errno = EPIPE;
479 : }
480 0 : ereport(ERROR,
481 : (errcode_for_file_access(),
482 : errmsg("could not write to COPY program: %m")));
483 : }
484 : else
485 0 : ereport(ERROR,
486 : (errcode_for_file_access(),
487 : errmsg("could not write to COPY file: %m")));
488 : }
489 12282 : break;
490 3641500 : case COPY_FRONTEND:
491 : /* Dump the accumulated row as one CopyData message */
492 3641500 : (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
493 3641500 : break;
494 6 : case COPY_CALLBACK:
495 6 : cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
496 6 : break;
497 : }
498 :
499 : /* Update the progress */
500 3653788 : cstate->bytes_processed += fe_msgbuf->len;
501 3653788 : pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
502 :
503 3653788 : resetStringInfo(fe_msgbuf);
504 3653788 : }
505 :
506 : /*
507 : * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
508 : * line termination and do common appropriate things for the end of row.
509 : */
510 : static inline void
511 3653742 : CopySendTextLikeEndOfRow(CopyToState cstate)
512 : {
513 3653742 : switch (cstate->copy_dest)
514 : {
515 12258 : case COPY_FILE:
516 : /* Default line termination depends on platform */
517 : #ifndef WIN32
518 12258 : CopySendChar(cstate, '\n');
519 : #else
520 : CopySendString(cstate, "\r\n");
521 : #endif
522 12258 : break;
523 3641478 : case COPY_FRONTEND:
524 : /* The FE/BE protocol uses \n as newline for all platforms */
525 3641478 : CopySendChar(cstate, '\n');
526 3641478 : break;
527 6 : default:
528 6 : break;
529 : }
530 :
531 : /* Now take the actions related to the end of a row */
532 3653742 : CopySendEndOfRow(cstate);
533 3653742 : }
534 :
535 : /*
536 : * These functions do apply some data conversion
537 : */
538 :
539 : /*
540 : * CopySendInt32 sends an int32 in network byte order
541 : */
542 : static inline void
543 188 : CopySendInt32(CopyToState cstate, int32 val)
544 : {
545 : uint32 buf;
546 :
547 188 : buf = pg_hton32((uint32) val);
548 188 : CopySendData(cstate, &buf, sizeof(buf));
549 188 : }
550 :
551 : /*
552 : * CopySendInt16 sends an int16 in network byte order
553 : */
554 : static inline void
555 46 : CopySendInt16(CopyToState cstate, int16 val)
556 : {
557 : uint16 buf;
558 :
559 46 : buf = pg_hton16((uint16) val);
560 46 : CopySendData(cstate, &buf, sizeof(buf));
561 46 : }
562 :
563 : /*
564 : * Closes the pipe to an external program, checking the pclose() return code.
565 : */
566 : static void
567 0 : ClosePipeToProgram(CopyToState cstate)
568 : {
569 : int pclose_rc;
570 :
571 : Assert(cstate->is_program);
572 :
573 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
574 0 : if (pclose_rc == -1)
575 0 : ereport(ERROR,
576 : (errcode_for_file_access(),
577 : errmsg("could not close pipe to external command: %m")));
578 0 : else if (pclose_rc != 0)
579 : {
580 0 : ereport(ERROR,
581 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
582 : errmsg("program \"%s\" failed",
583 : cstate->filename),
584 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
585 : }
586 0 : }
587 :
588 : /*
589 : * Release resources allocated in a cstate for COPY TO/FROM.
590 : */
591 : static void
592 9128 : EndCopy(CopyToState cstate)
593 : {
594 9128 : if (cstate->is_program)
595 : {
596 0 : ClosePipeToProgram(cstate);
597 : }
598 : else
599 : {
600 9128 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
601 0 : ereport(ERROR,
602 : (errcode_for_file_access(),
603 : errmsg("could not close file \"%s\": %m",
604 : cstate->filename)));
605 : }
606 :
607 9128 : pgstat_progress_end_command();
608 :
609 9128 : MemoryContextDelete(cstate->copycontext);
610 :
611 9128 : if (cstate->partitions)
612 6 : list_free(cstate->partitions);
613 :
614 9128 : pfree(cstate);
615 9128 : }
616 :
617 : /*
618 : * Setup CopyToState to read tuples from a table or a query for COPY TO.
619 : *
620 : * 'rel': Relation to be copied
621 : * 'raw_query': Query whose results are to be copied
622 : * 'queryRelId': OID of base relation to convert to a query (for RLS)
623 : * 'filename': Name of server-local file to write, NULL for STDOUT
624 : * 'is_program': true if 'filename' is program to execute
625 : * 'data_dest_cb': Callback that processes the output data
626 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
627 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
628 : *
629 : * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
630 : */
631 : CopyToState
632 9332 : BeginCopyTo(ParseState *pstate,
633 : Relation rel,
634 : RawStmt *raw_query,
635 : Oid queryRelId,
636 : const char *filename,
637 : bool is_program,
638 : copy_data_dest_cb data_dest_cb,
639 : List *attnamelist,
640 : List *options)
641 : {
642 : CopyToState cstate;
643 9332 : bool pipe = (filename == NULL && data_dest_cb == NULL);
644 : TupleDesc tupDesc;
645 : int num_phys_attrs;
646 : MemoryContext oldcontext;
647 9332 : const int progress_cols[] = {
648 : PROGRESS_COPY_COMMAND,
649 : PROGRESS_COPY_TYPE
650 : };
651 9332 : int64 progress_vals[] = {
652 : PROGRESS_COPY_COMMAND_TO,
653 : 0
654 : };
655 9332 : List *children = NIL;
656 :
657 9332 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
658 : {
659 32 : if (rel->rd_rel->relkind == RELKIND_VIEW)
660 12 : ereport(ERROR,
661 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
662 : errmsg("cannot copy from view \"%s\"",
663 : RelationGetRelationName(rel)),
664 : errhint("Try the COPY (SELECT ...) TO variant.")));
665 20 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
666 : {
667 12 : if (!RelationIsPopulated(rel))
668 6 : ereport(ERROR,
669 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
670 : errmsg("cannot copy from unpopulated materialized view \"%s\"",
671 : RelationGetRelationName(rel)),
672 : errhint("Use the REFRESH MATERIALIZED VIEW command."));
673 : }
674 8 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
675 0 : ereport(ERROR,
676 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
677 : errmsg("cannot copy from foreign table \"%s\"",
678 : RelationGetRelationName(rel)),
679 : errhint("Try the COPY (SELECT ...) TO variant.")));
680 8 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
681 0 : ereport(ERROR,
682 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
683 : errmsg("cannot copy from sequence \"%s\"",
684 : RelationGetRelationName(rel))));
685 8 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
686 : {
687 : /*
688 : * Collect OIDs of relation containing data, so that later
689 : * DoCopyTo can copy the data from them.
690 : */
691 8 : children = find_all_inheritors(RelationGetRelid(rel), AccessShareLock, NULL);
692 :
693 46 : foreach_oid(child, children)
694 : {
695 34 : char relkind = get_rel_relkind(child);
696 :
697 34 : if (relkind == RELKIND_FOREIGN_TABLE)
698 : {
699 2 : char *relation_name = get_rel_name(child);
700 :
701 2 : ereport(ERROR,
702 : errcode(ERRCODE_WRONG_OBJECT_TYPE),
703 : errmsg("cannot copy from foreign table \"%s\"", relation_name),
704 : errdetail("Partition \"%s\" is a foreign table in partitioned table \"%s\"",
705 : relation_name, RelationGetRelationName(rel)),
706 : errhint("Try the COPY (SELECT ...) TO variant."));
707 : }
708 :
709 : /* Exclude tables with no data */
710 32 : if (RELKIND_HAS_PARTITIONS(relkind))
711 20 : children = foreach_delete_current(children, child);
712 : }
713 : }
714 : else
715 0 : ereport(ERROR,
716 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
717 : errmsg("cannot copy from non-table relation \"%s\"",
718 : RelationGetRelationName(rel))));
719 : }
720 :
721 :
722 : /* Allocate workspace and zero all fields */
723 9312 : cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
724 :
725 : /*
726 : * We allocate everything used by a cstate in a new memory context. This
727 : * avoids memory leaks during repeated use of COPY in a query.
728 : */
729 9312 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
730 : "COPY",
731 : ALLOCSET_DEFAULT_SIZES);
732 :
733 9312 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
734 :
735 : /* Extract options from the statement node tree */
736 9312 : ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
737 :
738 : /* Set format routine */
739 9264 : cstate->routine = CopyToGetRoutine(&cstate->opts);
740 :
741 : /* Process the source/target relation or query */
742 9264 : if (rel)
743 : {
744 : Assert(!raw_query);
745 :
746 8590 : cstate->rel = rel;
747 :
748 8590 : tupDesc = RelationGetDescr(cstate->rel);
749 8590 : cstate->partitions = children;
750 : }
751 : else
752 : {
753 : List *rewritten;
754 : Query *query;
755 : PlannedStmt *plan;
756 : DestReceiver *dest;
757 :
758 674 : cstate->rel = NULL;
759 674 : cstate->partitions = NIL;
760 :
761 : /*
762 : * Run parse analysis and rewrite. Note this also acquires sufficient
763 : * locks on the source table(s).
764 : */
765 674 : rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
766 : pstate->p_sourcetext, NULL, 0,
767 : NULL);
768 :
769 : /* check that we got back something we can work with */
770 662 : if (rewritten == NIL)
771 : {
772 18 : ereport(ERROR,
773 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
774 : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
775 : }
776 644 : else if (list_length(rewritten) > 1)
777 : {
778 : ListCell *lc;
779 :
780 : /* examine queries to determine which error message to issue */
781 102 : foreach(lc, rewritten)
782 : {
783 84 : Query *q = lfirst_node(Query, lc);
784 :
785 84 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
786 18 : ereport(ERROR,
787 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
788 : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
789 66 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
790 18 : ereport(ERROR,
791 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
792 : errmsg("DO ALSO rules are not supported for COPY")));
793 : }
794 :
795 18 : ereport(ERROR,
796 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
797 : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
798 : }
799 :
800 590 : query = linitial_node(Query, rewritten);
801 :
802 : /* The grammar allows SELECT INTO, but we don't support that */
803 590 : if (query->utilityStmt != NULL &&
804 18 : IsA(query->utilityStmt, CreateTableAsStmt))
805 12 : ereport(ERROR,
806 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
807 : errmsg("COPY (SELECT INTO) is not supported")));
808 :
809 : /* The only other utility command we could see is NOTIFY */
810 578 : if (query->utilityStmt != NULL)
811 6 : ereport(ERROR,
812 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
813 : errmsg("COPY query must not be a utility command")));
814 :
815 : /*
816 : * Similarly the grammar doesn't enforce the presence of a RETURNING
817 : * clause, but this is required here.
818 : */
819 572 : if (query->commandType != CMD_SELECT &&
820 110 : query->returningList == NIL)
821 : {
822 : Assert(query->commandType == CMD_INSERT ||
823 : query->commandType == CMD_UPDATE ||
824 : query->commandType == CMD_DELETE ||
825 : query->commandType == CMD_MERGE);
826 :
827 24 : ereport(ERROR,
828 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
829 : errmsg("COPY query must have a RETURNING clause")));
830 : }
831 :
832 : /* plan the query */
833 548 : plan = pg_plan_query(query, pstate->p_sourcetext,
834 : CURSOR_OPT_PARALLEL_OK, NULL, NULL);
835 :
836 : /*
837 : * With row-level security and a user using "COPY relation TO", we
838 : * have to convert the "COPY relation TO" to a query-based COPY (eg:
839 : * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
840 : * add in any RLS clauses.
841 : *
842 : * When this happens, we are passed in the relid of the originally
843 : * found relation (which we have locked). As the planner will look up
844 : * the relation again, we double-check here to make sure it found the
845 : * same one that we have locked.
846 : */
847 546 : if (queryRelId != InvalidOid)
848 : {
849 : /*
850 : * Note that with RLS involved there may be multiple relations,
851 : * and while the one we need is almost certainly first, we don't
852 : * make any guarantees of that in the planner, so check the whole
853 : * list and make sure we find the original relation.
854 : */
855 78 : if (!list_member_oid(plan->relationOids, queryRelId))
856 0 : ereport(ERROR,
857 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
858 : errmsg("relation referenced by COPY statement has changed")));
859 : }
860 :
861 : /*
862 : * Use a snapshot with an updated command ID to ensure this query sees
863 : * results of any previously executed queries.
864 : */
865 546 : PushCopiedSnapshot(GetActiveSnapshot());
866 546 : UpdateActiveSnapshotCommandId();
867 :
868 : /* Create dest receiver for COPY OUT */
869 546 : dest = CreateDestReceiver(DestCopyOut);
870 546 : ((DR_copy *) dest)->cstate = cstate;
871 :
872 : /* Create a QueryDesc requesting no output */
873 546 : cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
874 : GetActiveSnapshot(),
875 : InvalidSnapshot,
876 : dest, NULL, NULL, 0);
877 :
878 : /*
879 : * Call ExecutorStart to prepare the plan for execution.
880 : *
881 : * ExecutorStart computes a result tupdesc for us
882 : */
883 546 : ExecutorStart(cstate->queryDesc, 0);
884 :
885 540 : tupDesc = cstate->queryDesc->tupDesc;
886 : }
887 :
888 : /* Generate or convert list of attributes to process */
889 9130 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
890 :
891 9130 : num_phys_attrs = tupDesc->natts;
892 :
893 : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
894 9130 : cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
895 9130 : if (cstate->opts.force_quote_all)
896 : {
897 18 : MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool));
898 : }
899 9112 : else if (cstate->opts.force_quote)
900 : {
901 : List *attnums;
902 : ListCell *cur;
903 :
904 24 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
905 :
906 48 : foreach(cur, attnums)
907 : {
908 24 : int attnum = lfirst_int(cur);
909 24 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
910 :
911 24 : if (!list_member_int(cstate->attnumlist, attnum))
912 0 : ereport(ERROR,
913 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
914 : /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
915 : errmsg("%s column \"%s\" not referenced by COPY",
916 : "FORCE_QUOTE", NameStr(attr->attname))));
917 24 : cstate->opts.force_quote_flags[attnum - 1] = true;
918 : }
919 : }
920 :
921 : /* Use client encoding when ENCODING option is not specified. */
922 9130 : if (cstate->opts.file_encoding < 0)
923 9112 : cstate->file_encoding = pg_get_client_encoding();
924 : else
925 18 : cstate->file_encoding = cstate->opts.file_encoding;
926 :
927 : /*
928 : * Set up encoding conversion info if the file and server encodings differ
929 : * (see also pg_server_to_any).
930 : */
931 9130 : if (cstate->file_encoding == GetDatabaseEncoding() ||
932 8 : cstate->file_encoding == PG_SQL_ASCII)
933 9128 : cstate->need_transcoding = false;
934 : else
935 2 : cstate->need_transcoding = true;
936 :
937 : /* See Multibyte encoding comment above */
938 9130 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
939 :
940 9130 : cstate->copy_dest = COPY_FILE; /* default */
941 :
942 9130 : if (data_dest_cb)
943 : {
944 2 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
945 2 : cstate->copy_dest = COPY_CALLBACK;
946 2 : cstate->data_dest_cb = data_dest_cb;
947 : }
948 9128 : else if (pipe)
949 : {
950 9066 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
951 :
952 : Assert(!is_program); /* the grammar does not allow this */
953 9066 : if (whereToSendOutput != DestRemote)
954 0 : cstate->copy_file = stdout;
955 : }
956 : else
957 : {
958 62 : cstate->filename = pstrdup(filename);
959 62 : cstate->is_program = is_program;
960 :
961 62 : if (is_program)
962 : {
963 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
964 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
965 0 : if (cstate->copy_file == NULL)
966 0 : ereport(ERROR,
967 : (errcode_for_file_access(),
968 : errmsg("could not execute command \"%s\": %m",
969 : cstate->filename)));
970 : }
971 : else
972 : {
973 : mode_t oumask; /* Pre-existing umask value */
974 : struct stat st;
975 :
976 62 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
977 :
978 : /*
979 : * Prevent write to relative path ... too easy to shoot oneself in
980 : * the foot by overwriting a database file ...
981 : */
982 62 : if (!is_absolute_path(filename))
983 0 : ereport(ERROR,
984 : (errcode(ERRCODE_INVALID_NAME),
985 : errmsg("relative path not allowed for COPY to file")));
986 :
987 62 : oumask = umask(S_IWGRP | S_IWOTH);
988 62 : PG_TRY();
989 : {
990 62 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
991 : }
992 0 : PG_FINALLY();
993 : {
994 62 : umask(oumask);
995 : }
996 62 : PG_END_TRY();
997 62 : if (cstate->copy_file == NULL)
998 : {
999 : /* copy errno because ereport subfunctions might change it */
1000 0 : int save_errno = errno;
1001 :
1002 0 : ereport(ERROR,
1003 : (errcode_for_file_access(),
1004 : errmsg("could not open file \"%s\" for writing: %m",
1005 : cstate->filename),
1006 : (save_errno == ENOENT || save_errno == EACCES) ?
1007 : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1008 : "You may want a client-side facility such as psql's \\copy.") : 0));
1009 : }
1010 :
1011 62 : if (fstat(fileno(cstate->copy_file), &st))
1012 0 : ereport(ERROR,
1013 : (errcode_for_file_access(),
1014 : errmsg("could not stat file \"%s\": %m",
1015 : cstate->filename)));
1016 :
1017 62 : if (S_ISDIR(st.st_mode))
1018 0 : ereport(ERROR,
1019 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1020 : errmsg("\"%s\" is a directory", cstate->filename)));
1021 : }
1022 : }
1023 :
1024 : /* initialize progress */
1025 9130 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1026 9130 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1027 9130 : pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
1028 :
1029 9130 : cstate->bytes_processed = 0;
1030 :
1031 9130 : MemoryContextSwitchTo(oldcontext);
1032 :
1033 9130 : return cstate;
1034 : }
1035 :
1036 : /*
1037 : * Clean up storage and release resources for COPY TO.
1038 : */
1039 : void
1040 9128 : EndCopyTo(CopyToState cstate)
1041 : {
1042 9128 : if (cstate->queryDesc != NULL)
1043 : {
1044 : /* Close down the query and free resources. */
1045 540 : ExecutorFinish(cstate->queryDesc);
1046 540 : ExecutorEnd(cstate->queryDesc);
1047 540 : FreeQueryDesc(cstate->queryDesc);
1048 540 : PopActiveSnapshot();
1049 : }
1050 :
1051 : /* Clean up storage */
1052 9128 : EndCopy(cstate);
1053 9128 : }
1054 :
1055 : /*
1056 : * Copy from relation or query TO file.
1057 : *
1058 : * Returns the number of rows processed.
1059 : */
1060 : uint64
1061 9130 : DoCopyTo(CopyToState cstate)
1062 : {
1063 9130 : bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
1064 9130 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1065 : TupleDesc tupDesc;
1066 : int num_phys_attrs;
1067 : ListCell *cur;
1068 9130 : uint64 processed = 0;
1069 :
1070 9130 : if (fe_copy)
1071 9066 : SendCopyBegin(cstate);
1072 :
1073 9130 : if (cstate->rel)
1074 8590 : tupDesc = RelationGetDescr(cstate->rel);
1075 : else
1076 540 : tupDesc = cstate->queryDesc->tupDesc;
1077 9130 : num_phys_attrs = tupDesc->natts;
1078 9130 : cstate->opts.null_print_client = cstate->opts.null_print; /* default */
1079 :
1080 : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1081 9130 : cstate->fe_msgbuf = makeStringInfo();
1082 :
1083 : /* Get info about the columns we need to process. */
1084 9130 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1085 41014 : foreach(cur, cstate->attnumlist)
1086 : {
1087 31886 : int attnum = lfirst_int(cur);
1088 31886 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1089 :
1090 31886 : cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
1091 31886 : &cstate->out_functions[attnum - 1]);
1092 : }
1093 :
1094 : /*
1095 : * Create a temporary memory context that we can reset once per row to
1096 : * recover palloc'd memory. This avoids any problems with leaks inside
1097 : * datatype output routines, and should be faster than retail pfree's
1098 : * anyway. (We don't need a whole econtext as CopyFrom does.)
1099 : */
1100 9128 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1101 : "COPY TO",
1102 : ALLOCSET_DEFAULT_SIZES);
1103 :
1104 9128 : cstate->routine->CopyToStart(cstate, tupDesc);
1105 :
1106 9128 : if (cstate->rel)
1107 : {
1108 : /*
1109 : * If COPY TO source table is a partitioned table, then open each
1110 : * partition and process each individual partition.
1111 : */
1112 8588 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1113 : {
1114 24 : foreach_oid(child, cstate->partitions)
1115 : {
1116 : Relation scan_rel;
1117 :
1118 : /* We already got the lock in BeginCopyTo */
1119 12 : scan_rel = table_open(child, NoLock);
1120 12 : CopyRelationTo(cstate, scan_rel, cstate->rel, &processed);
1121 12 : table_close(scan_rel, NoLock);
1122 : }
1123 : }
1124 : else
1125 8582 : CopyRelationTo(cstate, cstate->rel, NULL, &processed);
1126 : }
1127 : else
1128 : {
1129 : /* run the plan --- the dest receiver will send tuples */
1130 540 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
1131 540 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1132 : }
1133 :
1134 9128 : cstate->routine->CopyToEnd(cstate);
1135 :
1136 9128 : MemoryContextDelete(cstate->rowcontext);
1137 :
1138 9128 : if (fe_copy)
1139 9064 : SendCopyEnd(cstate);
1140 :
1141 9128 : return processed;
1142 : }
1143 :
1144 : /*
1145 : * Scans a single table and exports its rows to the COPY destination.
1146 : *
1147 : * root_rel can be set to the root table of rel if rel is a partition
1148 : * table so that we can send tuples in root_rel's rowtype, which might
1149 : * differ from individual partitions.
1150 : */
1151 : static void
1152 8594 : CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel, uint64 *processed)
1153 : {
1154 : TupleTableSlot *slot;
1155 : TableScanDesc scandesc;
1156 8594 : AttrMap *map = NULL;
1157 8594 : TupleTableSlot *root_slot = NULL;
1158 :
1159 8594 : scandesc = table_beginscan(rel, GetActiveSnapshot(), 0, NULL);
1160 8594 : slot = table_slot_create(rel, NULL);
1161 :
1162 : /*
1163 : * If we are exporting partition data here, we check if converting tuples
1164 : * to the root table's rowtype, because a partition might have column
1165 : * order different than its root table.
1166 : */
1167 8594 : if (root_rel != NULL)
1168 : {
1169 12 : root_slot = table_slot_create(root_rel, NULL);
1170 12 : map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
1171 : RelationGetDescr(rel),
1172 : false);
1173 : }
1174 :
1175 3655168 : while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
1176 : {
1177 : TupleTableSlot *copyslot;
1178 :
1179 3646574 : CHECK_FOR_INTERRUPTS();
1180 :
1181 3646574 : if (map != NULL)
1182 24 : copyslot = execute_attr_map_slot(map, slot, root_slot);
1183 : else
1184 : {
1185 : /* Deconstruct the tuple */
1186 3646550 : slot_getallattrs(slot);
1187 3646550 : copyslot = slot;
1188 : }
1189 :
1190 : /* Format and send the data */
1191 3646574 : CopyOneRowTo(cstate, copyslot);
1192 :
1193 : /*
1194 : * Increment the number of processed tuples, and report the progress.
1195 : */
1196 3646574 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1197 3646574 : ++(*processed));
1198 : }
1199 :
1200 8594 : ExecDropSingleTupleTableSlot(slot);
1201 :
1202 8594 : if (root_slot != NULL)
1203 12 : ExecDropSingleTupleTableSlot(root_slot);
1204 :
1205 8594 : if (map != NULL)
1206 6 : free_attrmap(map);
1207 :
1208 8594 : table_endscan(scandesc);
1209 8594 : }
1210 :
1211 : /*
1212 : * Emit one row during DoCopyTo().
1213 : */
1214 : static inline void
1215 3653738 : CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
1216 : {
1217 : MemoryContext oldcontext;
1218 :
1219 3653738 : MemoryContextReset(cstate->rowcontext);
1220 3653738 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1221 :
1222 : /* Make sure the tuple is fully deconstructed */
1223 3653738 : slot_getallattrs(slot);
1224 :
1225 3653738 : cstate->routine->CopyToOneRow(cstate, slot);
1226 :
1227 3653738 : MemoryContextSwitchTo(oldcontext);
1228 3653738 : }
1229 :
1230 : /*
1231 : * Send text representation of one attribute, with conversion and escaping
1232 : */
1233 : #define DUMPSOFAR() \
1234 : do { \
1235 : if (ptr > start) \
1236 : CopySendData(cstate, start, ptr - start); \
1237 : } while (0)
1238 :
1239 : static void
1240 12935424 : CopyAttributeOutText(CopyToState cstate, const char *string)
1241 : {
1242 : const char *ptr;
1243 : const char *start;
1244 : char c;
1245 12935424 : char delimc = cstate->opts.delim[0];
1246 :
1247 12935424 : if (cstate->need_transcoding)
1248 0 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1249 : else
1250 12935424 : ptr = string;
1251 :
1252 : /*
1253 : * We have to grovel through the string searching for control characters
1254 : * and instances of the delimiter character. In most cases, though, these
1255 : * are infrequent. To avoid overhead from calling CopySendData once per
1256 : * character, we dump out all characters between escaped characters in a
1257 : * single call. The loop invariant is that the data from "start" to "ptr"
1258 : * can be sent literally, but hasn't yet been.
1259 : *
1260 : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1261 : * in valid backend encodings, extra bytes of a multibyte character never
1262 : * look like ASCII. This loop is sufficiently performance-critical that
1263 : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1264 : * of the normal safe-encoding path.
1265 : */
1266 12935424 : if (cstate->encoding_embeds_ascii)
1267 : {
1268 0 : start = ptr;
1269 0 : while ((c = *ptr) != '\0')
1270 : {
1271 0 : if ((unsigned char) c < (unsigned char) 0x20)
1272 : {
1273 : /*
1274 : * \r and \n must be escaped, the others are traditional. We
1275 : * prefer to dump these using the C-like notation, rather than
1276 : * a backslash and the literal character, because it makes the
1277 : * dump file a bit more proof against Microsoftish data
1278 : * mangling.
1279 : */
1280 0 : switch (c)
1281 : {
1282 0 : case '\b':
1283 0 : c = 'b';
1284 0 : break;
1285 0 : case '\f':
1286 0 : c = 'f';
1287 0 : break;
1288 0 : case '\n':
1289 0 : c = 'n';
1290 0 : break;
1291 0 : case '\r':
1292 0 : c = 'r';
1293 0 : break;
1294 0 : case '\t':
1295 0 : c = 't';
1296 0 : break;
1297 0 : case '\v':
1298 0 : c = 'v';
1299 0 : break;
1300 0 : default:
1301 : /* If it's the delimiter, must backslash it */
1302 0 : if (c == delimc)
1303 0 : break;
1304 : /* All ASCII control chars are length 1 */
1305 0 : ptr++;
1306 0 : continue; /* fall to end of loop */
1307 : }
1308 : /* if we get here, we need to convert the control char */
1309 0 : DUMPSOFAR();
1310 0 : CopySendChar(cstate, '\\');
1311 0 : CopySendChar(cstate, c);
1312 0 : start = ++ptr; /* do not include char in next run */
1313 : }
1314 0 : else if (c == '\\' || c == delimc)
1315 : {
1316 0 : DUMPSOFAR();
1317 0 : CopySendChar(cstate, '\\');
1318 0 : start = ptr++; /* we include char in next run */
1319 : }
1320 0 : else if (IS_HIGHBIT_SET(c))
1321 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1322 : else
1323 0 : ptr++;
1324 : }
1325 : }
1326 : else
1327 : {
1328 12935424 : start = ptr;
1329 140342066 : while ((c = *ptr) != '\0')
1330 : {
1331 127406642 : if ((unsigned char) c < (unsigned char) 0x20)
1332 : {
1333 : /*
1334 : * \r and \n must be escaped, the others are traditional. We
1335 : * prefer to dump these using the C-like notation, rather than
1336 : * a backslash and the literal character, because it makes the
1337 : * dump file a bit more proof against Microsoftish data
1338 : * mangling.
1339 : */
1340 13950 : switch (c)
1341 : {
1342 0 : case '\b':
1343 0 : c = 'b';
1344 0 : break;
1345 0 : case '\f':
1346 0 : c = 'f';
1347 0 : break;
1348 11804 : case '\n':
1349 11804 : c = 'n';
1350 11804 : break;
1351 0 : case '\r':
1352 0 : c = 'r';
1353 0 : break;
1354 2146 : case '\t':
1355 2146 : c = 't';
1356 2146 : break;
1357 0 : case '\v':
1358 0 : c = 'v';
1359 0 : break;
1360 0 : default:
1361 : /* If it's the delimiter, must backslash it */
1362 0 : if (c == delimc)
1363 0 : break;
1364 : /* All ASCII control chars are length 1 */
1365 0 : ptr++;
1366 0 : continue; /* fall to end of loop */
1367 : }
1368 : /* if we get here, we need to convert the control char */
1369 13950 : DUMPSOFAR();
1370 13950 : CopySendChar(cstate, '\\');
1371 13950 : CopySendChar(cstate, c);
1372 13950 : start = ++ptr; /* do not include char in next run */
1373 : }
1374 127392692 : else if (c == '\\' || c == delimc)
1375 : {
1376 4528 : DUMPSOFAR();
1377 4528 : CopySendChar(cstate, '\\');
1378 4528 : start = ptr++; /* we include char in next run */
1379 : }
1380 : else
1381 127388164 : ptr++;
1382 : }
1383 : }
1384 :
1385 12935424 : DUMPSOFAR();
1386 12935424 : }
1387 :
1388 : /*
1389 : * Send text representation of one attribute, with conversion and
1390 : * CSV-style escaping
1391 : */
1392 : static void
1393 618 : CopyAttributeOutCSV(CopyToState cstate, const char *string,
1394 : bool use_quote)
1395 : {
1396 : const char *ptr;
1397 : const char *start;
1398 : char c;
1399 618 : char delimc = cstate->opts.delim[0];
1400 618 : char quotec = cstate->opts.quote[0];
1401 618 : char escapec = cstate->opts.escape[0];
1402 618 : bool single_attr = (list_length(cstate->attnumlist) == 1);
1403 :
1404 : /* force quoting if it matches null_print (before conversion!) */
1405 618 : if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1406 54 : use_quote = true;
1407 :
1408 618 : if (cstate->need_transcoding)
1409 0 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1410 : else
1411 618 : ptr = string;
1412 :
1413 : /*
1414 : * Make a preliminary pass to discover if it needs quoting
1415 : */
1416 618 : if (!use_quote)
1417 : {
1418 : /*
1419 : * Quote '\.' if it appears alone on a line, so that it will not be
1420 : * interpreted as an end-of-data marker. (PG 18 and up will not
1421 : * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1422 : * we want the data to be loadable by older versions too. Also, this
1423 : * avoids breaking clients that are still using PQgetline().)
1424 : */
1425 432 : if (single_attr && strcmp(ptr, "\\.") == 0)
1426 6 : use_quote = true;
1427 : else
1428 : {
1429 426 : const char *tptr = ptr;
1430 :
1431 2208 : while ((c = *tptr) != '\0')
1432 : {
1433 1914 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
1434 : {
1435 132 : use_quote = true;
1436 132 : break;
1437 : }
1438 1782 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1439 0 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1440 : else
1441 1782 : tptr++;
1442 : }
1443 : }
1444 : }
1445 :
1446 618 : if (use_quote)
1447 : {
1448 324 : CopySendChar(cstate, quotec);
1449 :
1450 : /*
1451 : * We adopt the same optimization strategy as in CopyAttributeOutText
1452 : */
1453 324 : start = ptr;
1454 2538 : while ((c = *ptr) != '\0')
1455 : {
1456 2214 : if (c == quotec || c == escapec)
1457 : {
1458 156 : DUMPSOFAR();
1459 156 : CopySendChar(cstate, escapec);
1460 156 : start = ptr; /* we include char in next run */
1461 : }
1462 2214 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1463 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1464 : else
1465 2214 : ptr++;
1466 : }
1467 324 : DUMPSOFAR();
1468 :
1469 324 : CopySendChar(cstate, quotec);
1470 : }
1471 : else
1472 : {
1473 : /* If it doesn't need quoting, we can just dump it as-is */
1474 294 : CopySendString(cstate, ptr);
1475 : }
1476 618 : }
1477 :
1478 : /*
1479 : * copy_dest_startup --- executor startup
1480 : */
1481 : static void
1482 540 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1483 : {
1484 : /* no-op */
1485 540 : }
1486 :
1487 : /*
1488 : * copy_dest_receive --- receive one tuple
1489 : */
1490 : static bool
1491 7164 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
1492 : {
1493 7164 : DR_copy *myState = (DR_copy *) self;
1494 7164 : CopyToState cstate = myState->cstate;
1495 :
1496 : /* Send the data */
1497 7164 : CopyOneRowTo(cstate, slot);
1498 :
1499 : /* Increment the number of processed tuples, and report the progress */
1500 7164 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1501 7164 : ++myState->processed);
1502 :
1503 7164 : return true;
1504 : }
1505 :
1506 : /*
1507 : * copy_dest_shutdown --- executor end
1508 : */
1509 : static void
1510 540 : copy_dest_shutdown(DestReceiver *self)
1511 : {
1512 : /* no-op */
1513 540 : }
1514 :
1515 : /*
1516 : * copy_dest_destroy --- release DestReceiver object
1517 : */
1518 : static void
1519 0 : copy_dest_destroy(DestReceiver *self)
1520 : {
1521 0 : pfree(self);
1522 0 : }
1523 :
1524 : /*
1525 : * CreateCopyDestReceiver -- create a suitable DestReceiver object
1526 : */
1527 : DestReceiver *
1528 546 : CreateCopyDestReceiver(void)
1529 : {
1530 546 : DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
1531 :
1532 546 : self->pub.receiveSlot = copy_dest_receive;
1533 546 : self->pub.rStartup = copy_dest_startup;
1534 546 : self->pub.rShutdown = copy_dest_shutdown;
1535 546 : self->pub.rDestroy = copy_dest_destroy;
1536 546 : self->pub.mydest = DestCopyOut;
1537 :
1538 546 : self->cstate = NULL; /* will be set later */
1539 546 : self->processed = 0;
1540 :
1541 546 : return (DestReceiver *) self;
1542 : }
|