Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyto.c
4 : * COPY <table> TO file/program/client
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/copyto.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <ctype.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 :
21 : #include "access/tableam.h"
22 : #include "commands/copyapi.h"
23 : #include "commands/progress.h"
24 : #include "executor/execdesc.h"
25 : #include "executor/executor.h"
26 : #include "executor/tuptable.h"
27 : #include "libpq/libpq.h"
28 : #include "libpq/pqformat.h"
29 : #include "mb/pg_wchar.h"
30 : #include "miscadmin.h"
31 : #include "pgstat.h"
32 : #include "storage/fd.h"
33 : #include "tcop/tcopprot.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/memutils.h"
36 : #include "utils/rel.h"
37 : #include "utils/snapmgr.h"
38 :
39 : /*
40 : * Represents the different dest cases we need to worry about at
41 : * the bottom level
42 : */
43 : typedef enum CopyDest
44 : {
45 : COPY_FILE, /* to file (or a piped program) */
46 : COPY_FRONTEND, /* to frontend */
47 : COPY_CALLBACK, /* to callback function */
48 : } CopyDest;
49 :
50 : /*
51 : * This struct contains all the state variables used throughout a COPY TO
52 : * operation.
53 : *
54 : * Multi-byte encodings: all supported client-side encodings encode multi-byte
55 : * characters by having the first byte's high bit set. Subsequent bytes of the
56 : * character can have the high bit not set. When scanning data in such an
57 : * encoding to look for a match to a single-byte (ie ASCII) character, we must
58 : * use the full pg_encoding_mblen() machinery to skip over multibyte
59 : * characters, else we might find a false match to a trailing byte. In
60 : * supported server encodings, there is no possibility of a false match, and
61 : * it's faster to make useless comparisons to trailing bytes than it is to
62 : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
63 : * when we have to do it the hard way.
64 : */
65 : typedef struct CopyToStateData
66 : {
67 : /* format-specific routines */
68 : const CopyToRoutine *routine;
69 :
70 : /* low-level state data */
71 : CopyDest copy_dest; /* type of copy source/destination */
72 : FILE *copy_file; /* used if copy_dest == COPY_FILE */
73 : StringInfo fe_msgbuf; /* used for all dests during COPY TO */
74 :
75 : int file_encoding; /* file or remote side's character encoding */
76 : bool need_transcoding; /* file encoding diff from server? */
77 : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
78 :
79 : /* parameters from the COPY command */
80 : Relation rel; /* relation to copy to */
81 : QueryDesc *queryDesc; /* executable query to copy from */
82 : List *attnumlist; /* integer list of attnums to copy */
83 : char *filename; /* filename, or NULL for STDOUT */
84 : bool is_program; /* is 'filename' a program to popen? */
85 : copy_data_dest_cb data_dest_cb; /* function for writing data */
86 :
87 : CopyFormatOptions opts;
88 : Node *whereClause; /* WHERE condition (or NULL) */
89 :
90 : /*
91 : * Working state
92 : */
93 : MemoryContext copycontext; /* per-copy execution context */
94 :
95 : FmgrInfo *out_functions; /* lookup info for output functions */
96 : MemoryContext rowcontext; /* per-row evaluation context */
97 : uint64 bytes_processed; /* number of bytes processed so far */
98 : } CopyToStateData;
99 :
100 : /* DestReceiver for COPY (query) TO */
101 : typedef struct
102 : {
103 : DestReceiver pub; /* publicly-known function pointers */
104 : CopyToState cstate; /* CopyToStateData for the command */
105 : uint64 processed; /* # of tuples processed */
106 : } DR_copy;
107 :
108 : /* NOTE: there's a copy of this in copyfromparse.c */
109 : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
110 :
111 :
112 : /* non-export function prototypes */
113 : static void EndCopy(CopyToState cstate);
114 : static void ClosePipeToProgram(CopyToState cstate);
115 : static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
116 : static void CopyAttributeOutText(CopyToState cstate, const char *string);
117 : static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
118 : bool use_quote);
119 :
120 : /* built-in format-specific routines */
121 : static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
122 : static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
123 : static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
124 : static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
125 : static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
126 : bool is_csv);
127 : static void CopyToTextLikeEnd(CopyToState cstate);
128 : static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
129 : static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
130 : static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
131 : static void CopyToBinaryEnd(CopyToState cstate);
132 :
133 : /* Low-level communications functions */
134 : static void SendCopyBegin(CopyToState cstate);
135 : static void SendCopyEnd(CopyToState cstate);
136 : static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
137 : static void CopySendString(CopyToState cstate, const char *str);
138 : static void CopySendChar(CopyToState cstate, char c);
139 : static void CopySendEndOfRow(CopyToState cstate);
140 : static void CopySendTextLikeEndOfRow(CopyToState cstate);
141 : static void CopySendInt32(CopyToState cstate, int32 val);
142 : static void CopySendInt16(CopyToState cstate, int16 val);
143 :
144 : /*
145 : * COPY TO routines for built-in formats.
146 : *
147 : * CSV and text formats share the same TextLike routines except for the
148 : * one-row callback.
149 : */
150 :
151 : /* text format */
152 : static const CopyToRoutine CopyToRoutineText = {
153 : .CopyToStart = CopyToTextLikeStart,
154 : .CopyToOutFunc = CopyToTextLikeOutFunc,
155 : .CopyToOneRow = CopyToTextOneRow,
156 : .CopyToEnd = CopyToTextLikeEnd,
157 : };
158 :
159 : /* CSV format */
160 : static const CopyToRoutine CopyToRoutineCSV = {
161 : .CopyToStart = CopyToTextLikeStart,
162 : .CopyToOutFunc = CopyToTextLikeOutFunc,
163 : .CopyToOneRow = CopyToCSVOneRow,
164 : .CopyToEnd = CopyToTextLikeEnd,
165 : };
166 :
167 : /* binary format */
168 : static const CopyToRoutine CopyToRoutineBinary = {
169 : .CopyToStart = CopyToBinaryStart,
170 : .CopyToOutFunc = CopyToBinaryOutFunc,
171 : .CopyToOneRow = CopyToBinaryOneRow,
172 : .CopyToEnd = CopyToBinaryEnd,
173 : };
174 :
175 : /* Return a COPY TO routine for the given options */
176 : static const CopyToRoutine *
177 12868 : CopyToGetRoutine(const CopyFormatOptions *opts)
178 : {
179 12868 : if (opts->csv_mode)
180 126 : return &CopyToRoutineCSV;
181 12742 : else if (opts->binary)
182 16 : return &CopyToRoutineBinary;
183 :
184 : /* default is text */
185 12726 : return &CopyToRoutineText;
186 : }
187 :
188 : /* Implementation of the start callback for text and CSV formats */
189 : static void
190 12718 : CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
191 : {
192 : /*
193 : * For non-binary copy, we need to convert null_print to file encoding,
194 : * because it will be sent directly with CopySendString.
195 : */
196 12718 : if (cstate->need_transcoding)
197 2 : cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
198 : cstate->opts.null_print_len,
199 : cstate->file_encoding);
200 :
201 : /* if a header has been requested send the line */
202 12718 : if (cstate->opts.header_line)
203 : {
204 : ListCell *cur;
205 24 : bool hdr_delim = false;
206 :
207 66 : foreach(cur, cstate->attnumlist)
208 : {
209 42 : int attnum = lfirst_int(cur);
210 : char *colname;
211 :
212 42 : if (hdr_delim)
213 18 : CopySendChar(cstate, cstate->opts.delim[0]);
214 42 : hdr_delim = true;
215 :
216 42 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
217 :
218 42 : if (cstate->opts.csv_mode)
219 24 : CopyAttributeOutCSV(cstate, colname, false);
220 : else
221 18 : CopyAttributeOutText(cstate, colname);
222 : }
223 :
224 24 : CopySendTextLikeEndOfRow(cstate);
225 : }
226 12718 : }
227 :
228 : /*
229 : * Implementation of the outfunc callback for text and CSV formats. Assign
230 : * the output function data to the given *finfo.
231 : */
232 : static void
233 47056 : CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
234 : {
235 : Oid func_oid;
236 : bool is_varlena;
237 :
238 : /* Set output function for an attribute */
239 47056 : getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
240 47056 : fmgr_info(func_oid, finfo);
241 47056 : }
242 :
243 : /* Implementation of the per-row callback for text format */
244 : static void
245 6163036 : CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
246 : {
247 6163036 : CopyToTextLikeOneRow(cstate, slot, false);
248 6163036 : }
249 :
250 : /* Implementation of the per-row callback for CSV format */
251 : static void
252 330 : CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
253 : {
254 330 : CopyToTextLikeOneRow(cstate, slot, true);
255 330 : }
256 :
257 : /*
258 : * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
259 : *
260 : * We use pg_attribute_always_inline to reduce function call overhead
261 : * and to help compilers to optimize away the 'is_csv' condition.
262 : */
263 : static pg_attribute_always_inline void
264 6163366 : CopyToTextLikeOneRow(CopyToState cstate,
265 : TupleTableSlot *slot,
266 : bool is_csv)
267 : {
268 6163366 : bool need_delim = false;
269 6163366 : FmgrInfo *out_functions = cstate->out_functions;
270 :
271 33772538 : foreach_int(attnum, cstate->attnumlist)
272 : {
273 21445806 : Datum value = slot->tts_values[attnum - 1];
274 21445806 : bool isnull = slot->tts_isnull[attnum - 1];
275 :
276 21445806 : if (need_delim)
277 15282588 : CopySendChar(cstate, cstate->opts.delim[0]);
278 21445806 : need_delim = true;
279 :
280 21445806 : if (isnull)
281 : {
282 1562554 : CopySendString(cstate, cstate->opts.null_print_client);
283 : }
284 : else
285 : {
286 : char *string;
287 :
288 19883252 : string = OutputFunctionCall(&out_functions[attnum - 1],
289 : value);
290 :
291 19883252 : if (is_csv)
292 594 : CopyAttributeOutCSV(cstate, string,
293 594 : cstate->opts.force_quote_flags[attnum - 1]);
294 : else
295 19882658 : CopyAttributeOutText(cstate, string);
296 : }
297 : }
298 :
299 6163366 : CopySendTextLikeEndOfRow(cstate);
300 6163366 : }
301 :
302 : /* Implementation of the end callback for text and CSV formats */
303 : static void
304 12718 : CopyToTextLikeEnd(CopyToState cstate)
305 : {
306 : /* Nothing to do here */
307 12718 : }
308 :
309 : /*
310 : * Implementation of the start callback for binary format. Send a header
311 : * for a binary copy.
312 : */
313 : static void
314 14 : CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
315 : {
316 : int32 tmp;
317 :
318 : /* Signature */
319 14 : CopySendData(cstate, BinarySignature, 11);
320 : /* Flags field */
321 14 : tmp = 0;
322 14 : CopySendInt32(cstate, tmp);
323 : /* No header extension */
324 14 : tmp = 0;
325 14 : CopySendInt32(cstate, tmp);
326 14 : }
327 :
328 : /*
329 : * Implementation of the outfunc callback for binary format. Assign
330 : * the binary output function to the given *finfo.
331 : */
332 : static void
333 62 : CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
334 : {
335 : Oid func_oid;
336 : bool is_varlena;
337 :
338 : /* Set output function for an attribute */
339 62 : getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
340 60 : fmgr_info(func_oid, finfo);
341 60 : }
342 :
343 : /* Implementation of the per-row callback for binary format */
344 : static void
345 32 : CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
346 : {
347 32 : FmgrInfo *out_functions = cstate->out_functions;
348 :
349 : /* Binary per-tuple header */
350 32 : CopySendInt16(cstate, list_length(cstate->attnumlist));
351 :
352 224 : foreach_int(attnum, cstate->attnumlist)
353 : {
354 160 : Datum value = slot->tts_values[attnum - 1];
355 160 : bool isnull = slot->tts_isnull[attnum - 1];
356 :
357 160 : if (isnull)
358 : {
359 30 : CopySendInt32(cstate, -1);
360 : }
361 : else
362 : {
363 : bytea *outputbytes;
364 :
365 130 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
366 : value);
367 130 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
368 130 : CopySendData(cstate, VARDATA(outputbytes),
369 130 : VARSIZE(outputbytes) - VARHDRSZ);
370 : }
371 : }
372 :
373 32 : CopySendEndOfRow(cstate);
374 32 : }
375 :
376 : /* Implementation of the end callback for binary format */
377 : static void
378 14 : CopyToBinaryEnd(CopyToState cstate)
379 : {
380 : /* Generate trailer for a binary copy */
381 14 : CopySendInt16(cstate, -1);
382 : /* Need to flush out the trailer */
383 14 : CopySendEndOfRow(cstate);
384 14 : }
385 :
386 : /*
387 : * Send copy start/stop messages for frontend copies. These have changed
388 : * in past protocol redesigns.
389 : */
390 : static void
391 12670 : SendCopyBegin(CopyToState cstate)
392 : {
393 : StringInfoData buf;
394 12670 : int natts = list_length(cstate->attnumlist);
395 12670 : int16 format = (cstate->opts.binary ? 1 : 0);
396 : int i;
397 :
398 12670 : pq_beginmessage(&buf, PqMsg_CopyOutResponse);
399 12670 : pq_sendbyte(&buf, format); /* overall format */
400 12670 : pq_sendint16(&buf, natts);
401 59556 : for (i = 0; i < natts; i++)
402 46886 : pq_sendint16(&buf, format); /* per-column formats */
403 12670 : pq_endmessage(&buf);
404 12670 : cstate->copy_dest = COPY_FRONTEND;
405 12670 : }
406 :
407 : static void
408 12668 : SendCopyEnd(CopyToState cstate)
409 : {
410 : /* Shouldn't have any unsent data */
411 : Assert(cstate->fe_msgbuf->len == 0);
412 : /* Send Copy Done message */
413 12668 : pq_putemptymessage(PqMsg_CopyDone);
414 12668 : }
415 :
416 : /*----------
417 : * CopySendData sends output data to the destination (file or frontend)
418 : * CopySendString does the same for null-terminated strings
419 : * CopySendChar does the same for single characters
420 : * CopySendEndOfRow does the appropriate thing at end of each data row
421 : * (data is not actually flushed except by CopySendEndOfRow)
422 : *
423 : * NB: no data conversion is applied by these functions
424 : *----------
425 : */
426 : static void
427 19682366 : CopySendData(CopyToState cstate, const void *databuf, int datasize)
428 : {
429 19682366 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
430 19682366 : }
431 :
432 : static void
433 1562848 : CopySendString(CopyToState cstate, const char *str)
434 : {
435 1562848 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
436 1562848 : }
437 :
438 : static void
439 21487280 : CopySendChar(CopyToState cstate, char c)
440 : {
441 21487280 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
442 21487280 : }
443 :
444 : static void
445 6163436 : CopySendEndOfRow(CopyToState cstate)
446 : {
447 6163436 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
448 :
449 6163436 : switch (cstate->copy_dest)
450 : {
451 12282 : case COPY_FILE:
452 12282 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
453 12282 : cstate->copy_file) != 1 ||
454 12282 : ferror(cstate->copy_file))
455 : {
456 0 : if (cstate->is_program)
457 : {
458 0 : if (errno == EPIPE)
459 : {
460 : /*
461 : * The pipe will be closed automatically on error at
462 : * the end of transaction, but we might get a better
463 : * error message from the subprocess' exit code than
464 : * just "Broken Pipe"
465 : */
466 0 : ClosePipeToProgram(cstate);
467 :
468 : /*
469 : * If ClosePipeToProgram() didn't throw an error, the
470 : * program terminated normally, but closed the pipe
471 : * first. Restore errno, and throw an error.
472 : */
473 0 : errno = EPIPE;
474 : }
475 0 : ereport(ERROR,
476 : (errcode_for_file_access(),
477 : errmsg("could not write to COPY program: %m")));
478 : }
479 : else
480 0 : ereport(ERROR,
481 : (errcode_for_file_access(),
482 : errmsg("could not write to COPY file: %m")));
483 : }
484 12282 : break;
485 6151148 : case COPY_FRONTEND:
486 : /* Dump the accumulated row as one CopyData message */
487 6151148 : (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
488 6151148 : break;
489 6 : case COPY_CALLBACK:
490 6 : cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
491 6 : break;
492 : }
493 :
494 : /* Update the progress */
495 6163436 : cstate->bytes_processed += fe_msgbuf->len;
496 6163436 : pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
497 :
498 6163436 : resetStringInfo(fe_msgbuf);
499 6163436 : }
500 :
501 : /*
502 : * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
503 : * line termination and do common appropriate things for the end of row.
504 : */
505 : static inline void
506 6163390 : CopySendTextLikeEndOfRow(CopyToState cstate)
507 : {
508 6163390 : switch (cstate->copy_dest)
509 : {
510 12258 : case COPY_FILE:
511 : /* Default line termination depends on platform */
512 : #ifndef WIN32
513 12258 : CopySendChar(cstate, '\n');
514 : #else
515 : CopySendString(cstate, "\r\n");
516 : #endif
517 12258 : break;
518 6151126 : case COPY_FRONTEND:
519 : /* The FE/BE protocol uses \n as newline for all platforms */
520 6151126 : CopySendChar(cstate, '\n');
521 6151126 : break;
522 6 : default:
523 6 : break;
524 : }
525 :
526 : /* Now take the actions related to the end of a row */
527 6163390 : CopySendEndOfRow(cstate);
528 6163390 : }
529 :
530 : /*
531 : * These functions do apply some data conversion
532 : */
533 :
534 : /*
535 : * CopySendInt32 sends an int32 in network byte order
536 : */
537 : static inline void
538 188 : CopySendInt32(CopyToState cstate, int32 val)
539 : {
540 : uint32 buf;
541 :
542 188 : buf = pg_hton32((uint32) val);
543 188 : CopySendData(cstate, &buf, sizeof(buf));
544 188 : }
545 :
546 : /*
547 : * CopySendInt16 sends an int16 in network byte order
548 : */
549 : static inline void
550 46 : CopySendInt16(CopyToState cstate, int16 val)
551 : {
552 : uint16 buf;
553 :
554 46 : buf = pg_hton16((uint16) val);
555 46 : CopySendData(cstate, &buf, sizeof(buf));
556 46 : }
557 :
558 : /*
559 : * Closes the pipe to an external program, checking the pclose() return code.
560 : */
561 : static void
562 0 : ClosePipeToProgram(CopyToState cstate)
563 : {
564 : int pclose_rc;
565 :
566 : Assert(cstate->is_program);
567 :
568 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
569 0 : if (pclose_rc == -1)
570 0 : ereport(ERROR,
571 : (errcode_for_file_access(),
572 : errmsg("could not close pipe to external command: %m")));
573 0 : else if (pclose_rc != 0)
574 : {
575 0 : ereport(ERROR,
576 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
577 : errmsg("program \"%s\" failed",
578 : cstate->filename),
579 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
580 : }
581 0 : }
582 :
583 : /*
584 : * Release resources allocated in a cstate for COPY TO/FROM.
585 : */
586 : static void
587 12732 : EndCopy(CopyToState cstate)
588 : {
589 12732 : if (cstate->is_program)
590 : {
591 0 : ClosePipeToProgram(cstate);
592 : }
593 : else
594 : {
595 12732 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
596 0 : ereport(ERROR,
597 : (errcode_for_file_access(),
598 : errmsg("could not close file \"%s\": %m",
599 : cstate->filename)));
600 : }
601 :
602 12732 : pgstat_progress_end_command();
603 :
604 12732 : MemoryContextDelete(cstate->copycontext);
605 12732 : pfree(cstate);
606 12732 : }
607 :
608 : /*
609 : * Setup CopyToState to read tuples from a table or a query for COPY TO.
610 : *
611 : * 'rel': Relation to be copied
612 : * 'raw_query': Query whose results are to be copied
613 : * 'queryRelId': OID of base relation to convert to a query (for RLS)
614 : * 'filename': Name of server-local file to write, NULL for STDOUT
615 : * 'is_program': true if 'filename' is program to execute
616 : * 'data_dest_cb': Callback that processes the output data
617 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
618 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
619 : *
620 : * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
621 : */
622 : CopyToState
623 12928 : BeginCopyTo(ParseState *pstate,
624 : Relation rel,
625 : RawStmt *raw_query,
626 : Oid queryRelId,
627 : const char *filename,
628 : bool is_program,
629 : copy_data_dest_cb data_dest_cb,
630 : List *attnamelist,
631 : List *options)
632 : {
633 : CopyToState cstate;
634 12928 : bool pipe = (filename == NULL && data_dest_cb == NULL);
635 : TupleDesc tupDesc;
636 : int num_phys_attrs;
637 : MemoryContext oldcontext;
638 12928 : const int progress_cols[] = {
639 : PROGRESS_COPY_COMMAND,
640 : PROGRESS_COPY_TYPE
641 : };
642 12928 : int64 progress_vals[] = {
643 : PROGRESS_COPY_COMMAND_TO,
644 : 0
645 : };
646 :
647 12928 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
648 : {
649 24 : if (rel->rd_rel->relkind == RELKIND_VIEW)
650 12 : ereport(ERROR,
651 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
652 : errmsg("cannot copy from view \"%s\"",
653 : RelationGetRelationName(rel)),
654 : errhint("Try the COPY (SELECT ...) TO variant.")));
655 12 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
656 : {
657 12 : if (!RelationIsPopulated(rel))
658 6 : ereport(ERROR,
659 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
660 : errmsg("cannot copy from unpopulated materialized view \"%s\"",
661 : RelationGetRelationName(rel)),
662 : errhint("Use the REFRESH MATERIALIZED VIEW command."));
663 : }
664 0 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
665 0 : ereport(ERROR,
666 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
667 : errmsg("cannot copy from foreign table \"%s\"",
668 : RelationGetRelationName(rel)),
669 : errhint("Try the COPY (SELECT ...) TO variant.")));
670 0 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
671 0 : ereport(ERROR,
672 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
673 : errmsg("cannot copy from sequence \"%s\"",
674 : RelationGetRelationName(rel))));
675 0 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
676 0 : ereport(ERROR,
677 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
678 : errmsg("cannot copy from partitioned table \"%s\"",
679 : RelationGetRelationName(rel)),
680 : errhint("Try the COPY (SELECT ...) TO variant.")));
681 : else
682 0 : ereport(ERROR,
683 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684 : errmsg("cannot copy from non-table relation \"%s\"",
685 : RelationGetRelationName(rel))));
686 : }
687 :
688 :
689 : /* Allocate workspace and zero all fields */
690 12910 : cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
691 :
692 : /*
693 : * We allocate everything used by a cstate in a new memory context. This
694 : * avoids memory leaks during repeated use of COPY in a query.
695 : */
696 12910 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
697 : "COPY",
698 : ALLOCSET_DEFAULT_SIZES);
699 :
700 12910 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
701 :
702 : /* Extract options from the statement node tree */
703 12910 : ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
704 :
705 : /* Set format routine */
706 12868 : cstate->routine = CopyToGetRoutine(&cstate->opts);
707 :
708 : /* Process the source/target relation or query */
709 12868 : if (rel)
710 : {
711 : Assert(!raw_query);
712 :
713 12362 : cstate->rel = rel;
714 :
715 12362 : tupDesc = RelationGetDescr(cstate->rel);
716 : }
717 : else
718 : {
719 : List *rewritten;
720 : Query *query;
721 : PlannedStmt *plan;
722 : DestReceiver *dest;
723 :
724 506 : cstate->rel = NULL;
725 :
726 : /*
727 : * Run parse analysis and rewrite. Note this also acquires sufficient
728 : * locks on the source table(s).
729 : */
730 506 : rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
731 : pstate->p_sourcetext, NULL, 0,
732 : NULL);
733 :
734 : /* check that we got back something we can work with */
735 494 : if (rewritten == NIL)
736 : {
737 18 : ereport(ERROR,
738 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
739 : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
740 : }
741 476 : else if (list_length(rewritten) > 1)
742 : {
743 : ListCell *lc;
744 :
745 : /* examine queries to determine which error message to issue */
746 102 : foreach(lc, rewritten)
747 : {
748 84 : Query *q = lfirst_node(Query, lc);
749 :
750 84 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
751 18 : ereport(ERROR,
752 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
753 : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
754 66 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
755 18 : ereport(ERROR,
756 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
757 : errmsg("DO ALSO rules are not supported for COPY")));
758 : }
759 :
760 18 : ereport(ERROR,
761 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
762 : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
763 : }
764 :
765 422 : query = linitial_node(Query, rewritten);
766 :
767 : /* The grammar allows SELECT INTO, but we don't support that */
768 422 : if (query->utilityStmt != NULL &&
769 18 : IsA(query->utilityStmt, CreateTableAsStmt))
770 12 : ereport(ERROR,
771 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
772 : errmsg("COPY (SELECT INTO) is not supported")));
773 :
774 : /* The only other utility command we could see is NOTIFY */
775 410 : if (query->utilityStmt != NULL)
776 6 : ereport(ERROR,
777 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
778 : errmsg("COPY query must not be a utility command")));
779 :
780 : /*
781 : * Similarly the grammar doesn't enforce the presence of a RETURNING
782 : * clause, but this is required here.
783 : */
784 404 : if (query->commandType != CMD_SELECT &&
785 110 : query->returningList == NIL)
786 : {
787 : Assert(query->commandType == CMD_INSERT ||
788 : query->commandType == CMD_UPDATE ||
789 : query->commandType == CMD_DELETE ||
790 : query->commandType == CMD_MERGE);
791 :
792 24 : ereport(ERROR,
793 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
794 : errmsg("COPY query must have a RETURNING clause")));
795 : }
796 :
797 : /* plan the query */
798 380 : plan = pg_plan_query(query, pstate->p_sourcetext,
799 : CURSOR_OPT_PARALLEL_OK, NULL);
800 :
801 : /*
802 : * With row-level security and a user using "COPY relation TO", we
803 : * have to convert the "COPY relation TO" to a query-based COPY (eg:
804 : * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
805 : * add in any RLS clauses.
806 : *
807 : * When this happens, we are passed in the relid of the originally
808 : * found relation (which we have locked). As the planner will look up
809 : * the relation again, we double-check here to make sure it found the
810 : * same one that we have locked.
811 : */
812 378 : if (queryRelId != InvalidOid)
813 : {
814 : /*
815 : * Note that with RLS involved there may be multiple relations,
816 : * and while the one we need is almost certainly first, we don't
817 : * make any guarantees of that in the planner, so check the whole
818 : * list and make sure we find the original relation.
819 : */
820 54 : if (!list_member_oid(plan->relationOids, queryRelId))
821 0 : ereport(ERROR,
822 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 : errmsg("relation referenced by COPY statement has changed")));
824 : }
825 :
826 : /*
827 : * Use a snapshot with an updated command ID to ensure this query sees
828 : * results of any previously executed queries.
829 : */
830 378 : PushCopiedSnapshot(GetActiveSnapshot());
831 378 : UpdateActiveSnapshotCommandId();
832 :
833 : /* Create dest receiver for COPY OUT */
834 378 : dest = CreateDestReceiver(DestCopyOut);
835 378 : ((DR_copy *) dest)->cstate = cstate;
836 :
837 : /* Create a QueryDesc requesting no output */
838 378 : cstate->queryDesc = CreateQueryDesc(plan, NULL, pstate->p_sourcetext,
839 : GetActiveSnapshot(),
840 : InvalidSnapshot,
841 : dest, NULL, NULL, 0);
842 :
843 : /*
844 : * Call ExecutorStart to prepare the plan for execution.
845 : *
846 : * ExecutorStart computes a result tupdesc for us
847 : */
848 378 : if (!ExecutorStart(cstate->queryDesc, 0))
849 0 : elog(ERROR, "ExecutorStart() failed unexpectedly");
850 :
851 372 : tupDesc = cstate->queryDesc->tupDesc;
852 : }
853 :
854 : /* Generate or convert list of attributes to process */
855 12734 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
856 :
857 12734 : num_phys_attrs = tupDesc->natts;
858 :
859 : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
860 12734 : cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
861 12734 : if (cstate->opts.force_quote_all)
862 : {
863 18 : MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool));
864 : }
865 12716 : else if (cstate->opts.force_quote)
866 : {
867 : List *attnums;
868 : ListCell *cur;
869 :
870 24 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
871 :
872 48 : foreach(cur, attnums)
873 : {
874 24 : int attnum = lfirst_int(cur);
875 24 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
876 :
877 24 : if (!list_member_int(cstate->attnumlist, attnum))
878 0 : ereport(ERROR,
879 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
880 : /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
881 : errmsg("%s column \"%s\" not referenced by COPY",
882 : "FORCE_QUOTE", NameStr(attr->attname))));
883 24 : cstate->opts.force_quote_flags[attnum - 1] = true;
884 : }
885 : }
886 :
887 : /* Use client encoding when ENCODING option is not specified. */
888 12734 : if (cstate->opts.file_encoding < 0)
889 12716 : cstate->file_encoding = pg_get_client_encoding();
890 : else
891 18 : cstate->file_encoding = cstate->opts.file_encoding;
892 :
893 : /*
894 : * Set up encoding conversion info if the file and server encodings differ
895 : * (see also pg_server_to_any).
896 : */
897 12734 : if (cstate->file_encoding == GetDatabaseEncoding() ||
898 8 : cstate->file_encoding == PG_SQL_ASCII)
899 12732 : cstate->need_transcoding = false;
900 : else
901 2 : cstate->need_transcoding = true;
902 :
903 : /* See Multibyte encoding comment above */
904 12734 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
905 :
906 12734 : cstate->copy_dest = COPY_FILE; /* default */
907 :
908 12734 : if (data_dest_cb)
909 : {
910 2 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
911 2 : cstate->copy_dest = COPY_CALLBACK;
912 2 : cstate->data_dest_cb = data_dest_cb;
913 : }
914 12732 : else if (pipe)
915 : {
916 12670 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
917 :
918 : Assert(!is_program); /* the grammar does not allow this */
919 12670 : if (whereToSendOutput != DestRemote)
920 0 : cstate->copy_file = stdout;
921 : }
922 : else
923 : {
924 62 : cstate->filename = pstrdup(filename);
925 62 : cstate->is_program = is_program;
926 :
927 62 : if (is_program)
928 : {
929 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
930 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
931 0 : if (cstate->copy_file == NULL)
932 0 : ereport(ERROR,
933 : (errcode_for_file_access(),
934 : errmsg("could not execute command \"%s\": %m",
935 : cstate->filename)));
936 : }
937 : else
938 : {
939 : mode_t oumask; /* Pre-existing umask value */
940 : struct stat st;
941 :
942 62 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
943 :
944 : /*
945 : * Prevent write to relative path ... too easy to shoot oneself in
946 : * the foot by overwriting a database file ...
947 : */
948 62 : if (!is_absolute_path(filename))
949 0 : ereport(ERROR,
950 : (errcode(ERRCODE_INVALID_NAME),
951 : errmsg("relative path not allowed for COPY to file")));
952 :
953 62 : oumask = umask(S_IWGRP | S_IWOTH);
954 62 : PG_TRY();
955 : {
956 62 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
957 : }
958 0 : PG_FINALLY();
959 : {
960 62 : umask(oumask);
961 : }
962 62 : PG_END_TRY();
963 62 : if (cstate->copy_file == NULL)
964 : {
965 : /* copy errno because ereport subfunctions might change it */
966 0 : int save_errno = errno;
967 :
968 0 : ereport(ERROR,
969 : (errcode_for_file_access(),
970 : errmsg("could not open file \"%s\" for writing: %m",
971 : cstate->filename),
972 : (save_errno == ENOENT || save_errno == EACCES) ?
973 : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
974 : "You may want a client-side facility such as psql's \\copy.") : 0));
975 : }
976 :
977 62 : if (fstat(fileno(cstate->copy_file), &st))
978 0 : ereport(ERROR,
979 : (errcode_for_file_access(),
980 : errmsg("could not stat file \"%s\": %m",
981 : cstate->filename)));
982 :
983 62 : if (S_ISDIR(st.st_mode))
984 0 : ereport(ERROR,
985 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
986 : errmsg("\"%s\" is a directory", cstate->filename)));
987 : }
988 : }
989 :
990 : /* initialize progress */
991 12734 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
992 12734 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
993 12734 : pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
994 :
995 12734 : cstate->bytes_processed = 0;
996 :
997 12734 : MemoryContextSwitchTo(oldcontext);
998 :
999 12734 : return cstate;
1000 : }
1001 :
1002 : /*
1003 : * Clean up storage and release resources for COPY TO.
1004 : */
1005 : void
1006 12732 : EndCopyTo(CopyToState cstate)
1007 : {
1008 12732 : if (cstate->queryDesc != NULL)
1009 : {
1010 : /* Close down the query and free resources. */
1011 372 : ExecutorFinish(cstate->queryDesc);
1012 372 : ExecutorEnd(cstate->queryDesc);
1013 372 : FreeQueryDesc(cstate->queryDesc);
1014 372 : PopActiveSnapshot();
1015 : }
1016 :
1017 : /* Clean up storage */
1018 12732 : EndCopy(cstate);
1019 12732 : }
1020 :
1021 : /*
1022 : * Copy from relation or query TO file.
1023 : *
1024 : * Returns the number of rows processed.
1025 : */
1026 : uint64
1027 12734 : DoCopyTo(CopyToState cstate)
1028 : {
1029 12734 : bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
1030 12734 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1031 : TupleDesc tupDesc;
1032 : int num_phys_attrs;
1033 : ListCell *cur;
1034 : uint64 processed;
1035 :
1036 12734 : if (fe_copy)
1037 12670 : SendCopyBegin(cstate);
1038 :
1039 12734 : if (cstate->rel)
1040 12362 : tupDesc = RelationGetDescr(cstate->rel);
1041 : else
1042 372 : tupDesc = cstate->queryDesc->tupDesc;
1043 12734 : num_phys_attrs = tupDesc->natts;
1044 12734 : cstate->opts.null_print_client = cstate->opts.null_print; /* default */
1045 :
1046 : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1047 12734 : cstate->fe_msgbuf = makeStringInfo();
1048 :
1049 : /* Get info about the columns we need to process. */
1050 12734 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1051 59850 : foreach(cur, cstate->attnumlist)
1052 : {
1053 47118 : int attnum = lfirst_int(cur);
1054 47118 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1055 :
1056 47118 : cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
1057 47118 : &cstate->out_functions[attnum - 1]);
1058 : }
1059 :
1060 : /*
1061 : * Create a temporary memory context that we can reset once per row to
1062 : * recover palloc'd memory. This avoids any problems with leaks inside
1063 : * datatype output routines, and should be faster than retail pfree's
1064 : * anyway. (We don't need a whole econtext as CopyFrom does.)
1065 : */
1066 12732 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1067 : "COPY TO",
1068 : ALLOCSET_DEFAULT_SIZES);
1069 :
1070 12732 : cstate->routine->CopyToStart(cstate, tupDesc);
1071 :
1072 12732 : if (cstate->rel)
1073 : {
1074 : TupleTableSlot *slot;
1075 : TableScanDesc scandesc;
1076 :
1077 12360 : scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
1078 12360 : slot = table_slot_create(cstate->rel, NULL);
1079 :
1080 12360 : processed = 0;
1081 6168756 : while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
1082 : {
1083 6156396 : CHECK_FOR_INTERRUPTS();
1084 :
1085 : /* Deconstruct the tuple ... */
1086 6156396 : slot_getallattrs(slot);
1087 :
1088 : /* Format and send the data */
1089 6156396 : CopyOneRowTo(cstate, slot);
1090 :
1091 : /*
1092 : * Increment the number of processed tuples, and report the
1093 : * progress.
1094 : */
1095 6156396 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1096 : ++processed);
1097 : }
1098 :
1099 12360 : ExecDropSingleTupleTableSlot(slot);
1100 12360 : table_endscan(scandesc);
1101 : }
1102 : else
1103 : {
1104 : /* run the plan --- the dest receiver will send tuples */
1105 372 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
1106 372 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1107 : }
1108 :
1109 12732 : cstate->routine->CopyToEnd(cstate);
1110 :
1111 12732 : MemoryContextDelete(cstate->rowcontext);
1112 :
1113 12732 : if (fe_copy)
1114 12668 : SendCopyEnd(cstate);
1115 :
1116 12732 : return processed;
1117 : }
1118 :
1119 : /*
1120 : * Emit one row during DoCopyTo().
1121 : */
1122 : static inline void
1123 6163398 : CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
1124 : {
1125 : MemoryContext oldcontext;
1126 :
1127 6163398 : MemoryContextReset(cstate->rowcontext);
1128 6163398 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1129 :
1130 : /* Make sure the tuple is fully deconstructed */
1131 6163398 : slot_getallattrs(slot);
1132 :
1133 6163398 : cstate->routine->CopyToOneRow(cstate, slot);
1134 :
1135 6163398 : MemoryContextSwitchTo(oldcontext);
1136 6163398 : }
1137 :
1138 : /*
1139 : * Send text representation of one attribute, with conversion and escaping
1140 : */
1141 : #define DUMPSOFAR() \
1142 : do { \
1143 : if (ptr > start) \
1144 : CopySendData(cstate, start, ptr - start); \
1145 : } while (0)
1146 :
1147 : static void
1148 19882676 : CopyAttributeOutText(CopyToState cstate, const char *string)
1149 : {
1150 : const char *ptr;
1151 : const char *start;
1152 : char c;
1153 19882676 : char delimc = cstate->opts.delim[0];
1154 :
1155 19882676 : if (cstate->need_transcoding)
1156 0 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1157 : else
1158 19882676 : ptr = string;
1159 :
1160 : /*
1161 : * We have to grovel through the string searching for control characters
1162 : * and instances of the delimiter character. In most cases, though, these
1163 : * are infrequent. To avoid overhead from calling CopySendData once per
1164 : * character, we dump out all characters between escaped characters in a
1165 : * single call. The loop invariant is that the data from "start" to "ptr"
1166 : * can be sent literally, but hasn't yet been.
1167 : *
1168 : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1169 : * in valid backend encodings, extra bytes of a multibyte character never
1170 : * look like ASCII. This loop is sufficiently performance-critical that
1171 : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1172 : * of the normal safe-encoding path.
1173 : */
1174 19882676 : if (cstate->encoding_embeds_ascii)
1175 : {
1176 0 : start = ptr;
1177 0 : while ((c = *ptr) != '\0')
1178 : {
1179 0 : if ((unsigned char) c < (unsigned char) 0x20)
1180 : {
1181 : /*
1182 : * \r and \n must be escaped, the others are traditional. We
1183 : * prefer to dump these using the C-like notation, rather than
1184 : * a backslash and the literal character, because it makes the
1185 : * dump file a bit more proof against Microsoftish data
1186 : * mangling.
1187 : */
1188 0 : switch (c)
1189 : {
1190 0 : case '\b':
1191 0 : c = 'b';
1192 0 : break;
1193 0 : case '\f':
1194 0 : c = 'f';
1195 0 : break;
1196 0 : case '\n':
1197 0 : c = 'n';
1198 0 : break;
1199 0 : case '\r':
1200 0 : c = 'r';
1201 0 : break;
1202 0 : case '\t':
1203 0 : c = 't';
1204 0 : break;
1205 0 : case '\v':
1206 0 : c = 'v';
1207 0 : break;
1208 0 : default:
1209 : /* If it's the delimiter, must backslash it */
1210 0 : if (c == delimc)
1211 0 : break;
1212 : /* All ASCII control chars are length 1 */
1213 0 : ptr++;
1214 0 : continue; /* fall to end of loop */
1215 : }
1216 : /* if we get here, we need to convert the control char */
1217 0 : DUMPSOFAR();
1218 0 : CopySendChar(cstate, '\\');
1219 0 : CopySendChar(cstate, c);
1220 0 : start = ++ptr; /* do not include char in next run */
1221 : }
1222 0 : else if (c == '\\' || c == delimc)
1223 : {
1224 0 : DUMPSOFAR();
1225 0 : CopySendChar(cstate, '\\');
1226 0 : start = ptr++; /* we include char in next run */
1227 : }
1228 0 : else if (IS_HIGHBIT_SET(c))
1229 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1230 : else
1231 0 : ptr++;
1232 : }
1233 : }
1234 : else
1235 : {
1236 19882676 : start = ptr;
1237 207428604 : while ((c = *ptr) != '\0')
1238 : {
1239 187545928 : if ((unsigned char) c < (unsigned char) 0x20)
1240 : {
1241 : /*
1242 : * \r and \n must be escaped, the others are traditional. We
1243 : * prefer to dump these using the C-like notation, rather than
1244 : * a backslash and the literal character, because it makes the
1245 : * dump file a bit more proof against Microsoftish data
1246 : * mangling.
1247 : */
1248 17186 : switch (c)
1249 : {
1250 0 : case '\b':
1251 0 : c = 'b';
1252 0 : break;
1253 0 : case '\f':
1254 0 : c = 'f';
1255 0 : break;
1256 15072 : case '\n':
1257 15072 : c = 'n';
1258 15072 : break;
1259 0 : case '\r':
1260 0 : c = 'r';
1261 0 : break;
1262 2114 : case '\t':
1263 2114 : c = 't';
1264 2114 : break;
1265 0 : case '\v':
1266 0 : c = 'v';
1267 0 : break;
1268 0 : default:
1269 : /* If it's the delimiter, must backslash it */
1270 0 : if (c == delimc)
1271 0 : break;
1272 : /* All ASCII control chars are length 1 */
1273 0 : ptr++;
1274 0 : continue; /* fall to end of loop */
1275 : }
1276 : /* if we get here, we need to convert the control char */
1277 17186 : DUMPSOFAR();
1278 17186 : CopySendChar(cstate, '\\');
1279 17186 : CopySendChar(cstate, c);
1280 17186 : start = ++ptr; /* do not include char in next run */
1281 : }
1282 187528742 : else if (c == '\\' || c == delimc)
1283 : {
1284 6114 : DUMPSOFAR();
1285 6114 : CopySendChar(cstate, '\\');
1286 6114 : start = ptr++; /* we include char in next run */
1287 : }
1288 : else
1289 187522628 : ptr++;
1290 : }
1291 : }
1292 :
1293 19882676 : DUMPSOFAR();
1294 19882676 : }
1295 :
1296 : /*
1297 : * Send text representation of one attribute, with conversion and
1298 : * CSV-style escaping
1299 : */
1300 : static void
1301 618 : CopyAttributeOutCSV(CopyToState cstate, const char *string,
1302 : bool use_quote)
1303 : {
1304 : const char *ptr;
1305 : const char *start;
1306 : char c;
1307 618 : char delimc = cstate->opts.delim[0];
1308 618 : char quotec = cstate->opts.quote[0];
1309 618 : char escapec = cstate->opts.escape[0];
1310 618 : bool single_attr = (list_length(cstate->attnumlist) == 1);
1311 :
1312 : /* force quoting if it matches null_print (before conversion!) */
1313 618 : if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1314 54 : use_quote = true;
1315 :
1316 618 : if (cstate->need_transcoding)
1317 0 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1318 : else
1319 618 : ptr = string;
1320 :
1321 : /*
1322 : * Make a preliminary pass to discover if it needs quoting
1323 : */
1324 618 : if (!use_quote)
1325 : {
1326 : /*
1327 : * Quote '\.' if it appears alone on a line, so that it will not be
1328 : * interpreted as an end-of-data marker. (PG 18 and up will not
1329 : * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1330 : * we want the data to be loadable by older versions too. Also, this
1331 : * avoids breaking clients that are still using PQgetline().)
1332 : */
1333 432 : if (single_attr && strcmp(ptr, "\\.") == 0)
1334 6 : use_quote = true;
1335 : else
1336 : {
1337 426 : const char *tptr = ptr;
1338 :
1339 2208 : while ((c = *tptr) != '\0')
1340 : {
1341 1914 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
1342 : {
1343 132 : use_quote = true;
1344 132 : break;
1345 : }
1346 1782 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1347 0 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1348 : else
1349 1782 : tptr++;
1350 : }
1351 : }
1352 : }
1353 :
1354 618 : if (use_quote)
1355 : {
1356 324 : CopySendChar(cstate, quotec);
1357 :
1358 : /*
1359 : * We adopt the same optimization strategy as in CopyAttributeOutText
1360 : */
1361 324 : start = ptr;
1362 2538 : while ((c = *ptr) != '\0')
1363 : {
1364 2214 : if (c == quotec || c == escapec)
1365 : {
1366 156 : DUMPSOFAR();
1367 156 : CopySendChar(cstate, escapec);
1368 156 : start = ptr; /* we include char in next run */
1369 : }
1370 2214 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1371 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1372 : else
1373 2214 : ptr++;
1374 : }
1375 324 : DUMPSOFAR();
1376 :
1377 324 : CopySendChar(cstate, quotec);
1378 : }
1379 : else
1380 : {
1381 : /* If it doesn't need quoting, we can just dump it as-is */
1382 294 : CopySendString(cstate, ptr);
1383 : }
1384 618 : }
1385 :
1386 : /*
1387 : * copy_dest_startup --- executor startup
1388 : */
1389 : static void
1390 372 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1391 : {
1392 : /* no-op */
1393 372 : }
1394 :
1395 : /*
1396 : * copy_dest_receive --- receive one tuple
1397 : */
1398 : static bool
1399 7002 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
1400 : {
1401 7002 : DR_copy *myState = (DR_copy *) self;
1402 7002 : CopyToState cstate = myState->cstate;
1403 :
1404 : /* Send the data */
1405 7002 : CopyOneRowTo(cstate, slot);
1406 :
1407 : /* Increment the number of processed tuples, and report the progress */
1408 7002 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1409 7002 : ++myState->processed);
1410 :
1411 7002 : return true;
1412 : }
1413 :
1414 : /*
1415 : * copy_dest_shutdown --- executor end
1416 : */
1417 : static void
1418 372 : copy_dest_shutdown(DestReceiver *self)
1419 : {
1420 : /* no-op */
1421 372 : }
1422 :
1423 : /*
1424 : * copy_dest_destroy --- release DestReceiver object
1425 : */
1426 : static void
1427 0 : copy_dest_destroy(DestReceiver *self)
1428 : {
1429 0 : pfree(self);
1430 0 : }
1431 :
1432 : /*
1433 : * CreateCopyDestReceiver -- create a suitable DestReceiver object
1434 : */
1435 : DestReceiver *
1436 378 : CreateCopyDestReceiver(void)
1437 : {
1438 378 : DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
1439 :
1440 378 : self->pub.receiveSlot = copy_dest_receive;
1441 378 : self->pub.rStartup = copy_dest_startup;
1442 378 : self->pub.rShutdown = copy_dest_shutdown;
1443 378 : self->pub.rDestroy = copy_dest_destroy;
1444 378 : self->pub.mydest = DestCopyOut;
1445 :
1446 378 : self->cstate = NULL; /* will be set later */
1447 378 : self->processed = 0;
1448 :
1449 378 : return (DestReceiver *) self;
1450 : }
|