Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyto.c
4 : * COPY <table> TO file/program/client
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/copyto.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <ctype.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 :
21 : #include "access/table.h"
22 : #include "access/tableam.h"
23 : #include "access/tupconvert.h"
24 : #include "catalog/pg_inherits.h"
25 : #include "commands/copyapi.h"
26 : #include "commands/progress.h"
27 : #include "executor/execdesc.h"
28 : #include "executor/executor.h"
29 : #include "executor/tuptable.h"
30 : #include "funcapi.h"
31 : #include "libpq/libpq.h"
32 : #include "libpq/pqformat.h"
33 : #include "mb/pg_wchar.h"
34 : #include "miscadmin.h"
35 : #include "pgstat.h"
36 : #include "storage/fd.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/json.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/memutils.h"
41 : #include "utils/rel.h"
42 : #include "utils/snapmgr.h"
43 : #include "utils/wait_event.h"
44 :
45 : /*
46 : * Represents the different dest cases we need to worry about at
47 : * the bottom level
48 : */
49 : typedef enum CopyDest
50 : {
51 : COPY_FILE, /* to file (or a piped program) */
52 : COPY_FRONTEND, /* to frontend */
53 : COPY_CALLBACK, /* to callback function */
54 : } CopyDest;
55 :
56 : /*
57 : * This struct contains all the state variables used throughout a COPY TO
58 : * operation.
59 : *
60 : * Multi-byte encodings: all supported client-side encodings encode multi-byte
61 : * characters by having the first byte's high bit set. Subsequent bytes of the
62 : * character can have the high bit not set. When scanning data in such an
63 : * encoding to look for a match to a single-byte (ie ASCII) character, we must
64 : * use the full pg_encoding_mblen() machinery to skip over multibyte
65 : * characters, else we might find a false match to a trailing byte. In
66 : * supported server encodings, there is no possibility of a false match, and
67 : * it's faster to make useless comparisons to trailing bytes than it is to
68 : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
69 : * when we have to do it the hard way.
70 : */
71 : typedef struct CopyToStateData
72 : {
73 : /* format-specific routines */
74 : const CopyToRoutine *routine;
75 :
76 : /* low-level state data */
77 : CopyDest copy_dest; /* type of copy source/destination */
78 : FILE *copy_file; /* used if copy_dest == COPY_FILE */
79 : StringInfo fe_msgbuf; /* used for all dests during COPY TO */
80 :
81 : int file_encoding; /* file or remote side's character encoding */
82 : bool need_transcoding; /* file encoding diff from server? */
83 : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
84 :
85 : /* parameters from the COPY command */
86 : Relation rel; /* relation to copy to */
87 : QueryDesc *queryDesc; /* executable query to copy from */
88 : List *attnumlist; /* integer list of attnums to copy */
89 : char *filename; /* filename, or NULL for STDOUT */
90 : bool is_program; /* is 'filename' a program to popen? */
91 : bool json_row_delim_needed; /* need delimiter before next row */
92 : StringInfo json_buf; /* reusable buffer for JSON output,
93 : * initialized in BeginCopyTo */
94 : TupleDesc tupDesc; /* Descriptor for JSON output; for a column
95 : * list this is a projected descriptor */
96 : Datum *json_projvalues; /* pre-allocated projection values, or
97 : * NULL */
98 : bool *json_projnulls; /* pre-allocated projection nulls, or NULL */
99 : copy_data_dest_cb data_dest_cb; /* function for writing data */
100 :
101 : CopyFormatOptions opts;
102 : Node *whereClause; /* WHERE condition (or NULL) */
103 : List *partitions; /* OID list of partitions to copy data from */
104 :
105 : /*
106 : * Working state
107 : */
108 : MemoryContext copycontext; /* per-copy execution context */
109 :
110 : FmgrInfo *out_functions; /* lookup info for output functions */
111 : MemoryContext rowcontext; /* per-row evaluation context */
112 : uint64 bytes_processed; /* number of bytes processed so far */
113 : } CopyToStateData;
114 :
115 : /* DestReceiver for COPY (query) TO */
116 : typedef struct
117 : {
118 : DestReceiver pub; /* publicly-known function pointers */
119 : CopyToState cstate; /* CopyToStateData for the command */
120 : uint64 processed; /* # of tuples processed */
121 : } DR_copy;
122 :
123 : /* NOTE: there's a copy of this in copyfromparse.c */
124 : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
125 :
126 :
127 : /* non-export function prototypes */
128 : static void EndCopy(CopyToState cstate);
129 : static void ClosePipeToProgram(CopyToState cstate);
130 : static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
131 : static void CopyAttributeOutText(CopyToState cstate, const char *string);
132 : static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
133 : bool use_quote);
134 : static void CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel,
135 : uint64 *processed);
136 :
137 : /* built-in format-specific routines */
138 : static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
139 : static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
140 : static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
141 : static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
142 : static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
143 : bool is_csv);
144 : static void CopyToTextLikeEnd(CopyToState cstate);
145 : static void CopyToJsonOneRow(CopyToState cstate, TupleTableSlot *slot);
146 : static void CopyToJsonEnd(CopyToState cstate);
147 : static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
148 : static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
149 : static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
150 : static void CopyToBinaryEnd(CopyToState cstate);
151 :
152 : /* Low-level communications functions */
153 : static void SendCopyBegin(CopyToState cstate);
154 : static void SendCopyEnd(CopyToState cstate);
155 : static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
156 : static void CopySendString(CopyToState cstate, const char *str);
157 : static void CopySendChar(CopyToState cstate, char c);
158 : static void CopySendEndOfRow(CopyToState cstate);
159 : static void CopySendTextLikeEndOfRow(CopyToState cstate);
160 : static void CopySendInt32(CopyToState cstate, int32 val);
161 : static void CopySendInt16(CopyToState cstate, int16 val);
162 :
163 : /*
164 : * COPY TO routines for built-in formats.
165 : */
166 :
167 : /* text format */
168 : static const CopyToRoutine CopyToRoutineText = {
169 : .CopyToStart = CopyToTextLikeStart,
170 : .CopyToOutFunc = CopyToTextLikeOutFunc,
171 : .CopyToOneRow = CopyToTextOneRow,
172 : .CopyToEnd = CopyToTextLikeEnd,
173 : };
174 :
175 : /* CSV format */
176 : static const CopyToRoutine CopyToRoutineCSV = {
177 : .CopyToStart = CopyToTextLikeStart,
178 : .CopyToOutFunc = CopyToTextLikeOutFunc,
179 : .CopyToOneRow = CopyToCSVOneRow,
180 : .CopyToEnd = CopyToTextLikeEnd,
181 : };
182 :
183 : /* json format */
184 : static const CopyToRoutine CopyToRoutineJson = {
185 : .CopyToStart = CopyToTextLikeStart,
186 : .CopyToOutFunc = CopyToTextLikeOutFunc,
187 : .CopyToOneRow = CopyToJsonOneRow,
188 : .CopyToEnd = CopyToJsonEnd,
189 : };
190 :
191 : /* binary format */
192 : static const CopyToRoutine CopyToRoutineBinary = {
193 : .CopyToStart = CopyToBinaryStart,
194 : .CopyToOutFunc = CopyToBinaryOutFunc,
195 : .CopyToOneRow = CopyToBinaryOneRow,
196 : .CopyToEnd = CopyToBinaryEnd,
197 : };
198 :
199 : /* Return a COPY TO routine for the given options */
200 : static const CopyToRoutine *
201 5274 : CopyToGetRoutine(const CopyFormatOptions *opts)
202 : {
203 5274 : if (opts->format == COPY_FORMAT_CSV)
204 88 : return &CopyToRoutineCSV;
205 5186 : else if (opts->format == COPY_FORMAT_BINARY)
206 9 : return &CopyToRoutineBinary;
207 5177 : else if (opts->format == COPY_FORMAT_JSON)
208 84 : return &CopyToRoutineJson;
209 :
210 : /* default is text */
211 5093 : return &CopyToRoutineText;
212 : }
213 :
214 : /* Implementation of the start callback for text, CSV, and json formats */
215 : static void
216 5176 : CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
217 : {
218 : /*
219 : * For non-binary copy, we need to convert null_print to file encoding,
220 : * because it will be sent directly with CopySendString.
221 : */
222 5176 : if (cstate->need_transcoding)
223 9 : cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
224 : cstate->opts.null_print_len,
225 : cstate->file_encoding);
226 :
227 : /* if a header has been requested send the line */
228 5176 : if (cstate->opts.header_line == COPY_HEADER_TRUE)
229 : {
230 : ListCell *cur;
231 24 : bool hdr_delim = false;
232 :
233 : Assert(cstate->opts.format != COPY_FORMAT_JSON);
234 :
235 64 : foreach(cur, cstate->attnumlist)
236 : {
237 40 : int attnum = lfirst_int(cur);
238 : char *colname;
239 :
240 40 : if (hdr_delim)
241 16 : CopySendChar(cstate, cstate->opts.delim[0]);
242 40 : hdr_delim = true;
243 :
244 40 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
245 :
246 40 : if (cstate->opts.format == COPY_FORMAT_CSV)
247 16 : CopyAttributeOutCSV(cstate, colname, false);
248 : else
249 24 : CopyAttributeOutText(cstate, colname);
250 : }
251 :
252 24 : CopySendTextLikeEndOfRow(cstate);
253 : }
254 :
255 : /*
256 : * If FORCE_ARRAY has been specified, send the opening bracket.
257 : */
258 5176 : if (cstate->opts.format == COPY_FORMAT_JSON && cstate->opts.force_array)
259 : {
260 20 : CopySendChar(cstate, '[');
261 20 : CopySendTextLikeEndOfRow(cstate);
262 : }
263 5176 : }
264 :
265 : /*
266 : * Implementation of the outfunc callback for text, CSV, and json formats. Assign
267 : * the output function data to the given *finfo.
268 : */
269 : static void
270 17384 : CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
271 : {
272 : Oid func_oid;
273 : bool is_varlena;
274 :
275 : /* Set output function for an attribute */
276 17384 : getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
277 17384 : fmgr_info(func_oid, finfo);
278 17384 : }
279 :
280 : /* Implementation of the per-row callback for text format */
281 : static void
282 1840071 : CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
283 : {
284 1840071 : CopyToTextLikeOneRow(cstate, slot, false);
285 1840071 : }
286 :
287 : /* Implementation of the per-row callback for CSV format */
288 : static void
289 224 : CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
290 : {
291 224 : CopyToTextLikeOneRow(cstate, slot, true);
292 224 : }
293 :
294 : /*
295 : * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
296 : *
297 : * We use pg_attribute_always_inline to reduce function call overhead
298 : * and to help compilers to optimize away the 'is_csv' condition.
299 : */
300 : static pg_attribute_always_inline void
301 1840295 : CopyToTextLikeOneRow(CopyToState cstate,
302 : TupleTableSlot *slot,
303 : bool is_csv)
304 : {
305 1840295 : bool need_delim = false;
306 1840295 : FmgrInfo *out_functions = cstate->out_functions;
307 :
308 10905232 : foreach_int(attnum, cstate->attnumlist)
309 : {
310 7224642 : Datum value = slot->tts_values[attnum - 1];
311 7224642 : bool isnull = slot->tts_isnull[attnum - 1];
312 :
313 7224642 : if (need_delim)
314 5384412 : CopySendChar(cstate, cstate->opts.delim[0]);
315 7224642 : need_delim = true;
316 :
317 7224642 : if (isnull)
318 : {
319 611617 : CopySendString(cstate, cstate->opts.null_print_client);
320 : }
321 : else
322 : {
323 : char *string;
324 :
325 6613025 : string = OutputFunctionCall(&out_functions[attnum - 1],
326 : value);
327 :
328 6613025 : if (is_csv)
329 400 : CopyAttributeOutCSV(cstate, string,
330 400 : cstate->opts.force_quote_flags[attnum - 1]);
331 : else
332 6612625 : CopyAttributeOutText(cstate, string);
333 : }
334 : }
335 :
336 1840295 : CopySendTextLikeEndOfRow(cstate);
337 1840295 : }
338 :
339 : /* Implementation of the end callback for text and CSV formats */
340 : static void
341 5092 : CopyToTextLikeEnd(CopyToState cstate)
342 : {
343 : /* Nothing to do here */
344 5092 : }
345 :
346 : /* Implementation of the end callback for json format */
347 : static void
348 84 : CopyToJsonEnd(CopyToState cstate)
349 : {
350 84 : if (cstate->opts.force_array)
351 : {
352 20 : CopySendChar(cstate, ']');
353 20 : CopySendTextLikeEndOfRow(cstate);
354 : }
355 84 : }
356 :
357 : /* Implementation of per-row callback for json format */
358 : static void
359 288 : CopyToJsonOneRow(CopyToState cstate, TupleTableSlot *slot)
360 : {
361 : Datum rowdata;
362 :
363 288 : resetStringInfo(cstate->json_buf);
364 :
365 288 : if (cstate->json_projvalues != NULL)
366 : {
367 : /*
368 : * Column list case: project selected column values into sequential
369 : * positions matching the custom TupleDesc, then form a new tuple.
370 : */
371 : HeapTuple tup;
372 104 : int i = 0;
373 :
374 444 : foreach_int(attnum, cstate->attnumlist)
375 : {
376 236 : cstate->json_projvalues[i] = slot->tts_values[attnum - 1];
377 236 : cstate->json_projnulls[i] = slot->tts_isnull[attnum - 1];
378 236 : i++;
379 : }
380 :
381 104 : tup = heap_form_tuple(cstate->tupDesc,
382 104 : cstate->json_projvalues,
383 104 : cstate->json_projnulls);
384 :
385 : /*
386 : * heap_form_tuple already stamps the datum-length, type-id, and
387 : * type-mod fields on t_data, so we can use it directly as a composite
388 : * Datum without the extra pallocmemcpy that heap_copy_tuple_as_datum
389 : * would do. Any TOAST pointers in the projected values will be
390 : * detoasted by the per-column output functions called from
391 : * composite_to_json.
392 : */
393 104 : rowdata = HeapTupleGetDatum(tup);
394 : }
395 : else
396 : {
397 : /*
398 : * Full table or query without column list. For queries, the slot's
399 : * TupleDesc may carry RECORDOID, which is not registered in the type
400 : * cache and would cause composite_to_json's lookup_rowtype_tupdesc
401 : * call to fail. Build a HeapTuple stamped with the blessed
402 : * descriptor so the type can be looked up correctly.
403 : */
404 184 : if (!cstate->rel && slot->tts_tupleDescriptor->tdtypeid == RECORDOID)
405 40 : {
406 40 : HeapTuple tup = heap_form_tuple(cstate->tupDesc,
407 40 : slot->tts_values,
408 40 : slot->tts_isnull);
409 :
410 40 : rowdata = HeapTupleGetDatum(tup);
411 : }
412 : else
413 144 : rowdata = ExecFetchSlotHeapTupleDatum(slot);
414 : }
415 :
416 288 : composite_to_json(rowdata, cstate->json_buf, false);
417 :
418 288 : if (cstate->opts.force_array)
419 : {
420 48 : if (cstate->json_row_delim_needed)
421 32 : CopySendChar(cstate, ',');
422 : else
423 : {
424 : /* first row needs no delimiter */
425 16 : CopySendChar(cstate, ' ');
426 16 : cstate->json_row_delim_needed = true;
427 : }
428 : }
429 :
430 288 : CopySendData(cstate, cstate->json_buf->data, cstate->json_buf->len);
431 :
432 288 : CopySendTextLikeEndOfRow(cstate);
433 288 : }
434 :
435 : /*
436 : * Implementation of the start callback for binary format. Send a header
437 : * for a binary copy.
438 : */
439 : static void
440 8 : CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
441 : {
442 : int32 tmp;
443 :
444 : /* Signature */
445 8 : CopySendData(cstate, BinarySignature, 11);
446 : /* Flags field */
447 8 : tmp = 0;
448 8 : CopySendInt32(cstate, tmp);
449 : /* No header extension */
450 8 : tmp = 0;
451 8 : CopySendInt32(cstate, tmp);
452 8 : }
453 :
454 : /*
455 : * Implementation of the outfunc callback for binary format. Assign
456 : * the binary output function to the given *finfo.
457 : */
458 : static void
459 38 : CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
460 : {
461 : Oid func_oid;
462 : bool is_varlena;
463 :
464 : /* Set output function for an attribute */
465 38 : getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
466 37 : fmgr_info(func_oid, finfo);
467 37 : }
468 :
469 : /* Implementation of the per-row callback for binary format */
470 : static void
471 19 : CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
472 : {
473 19 : FmgrInfo *out_functions = cstate->out_functions;
474 :
475 : /* Binary per-tuple header */
476 19 : CopySendInt16(cstate, list_length(cstate->attnumlist));
477 :
478 139 : foreach_int(attnum, cstate->attnumlist)
479 : {
480 101 : Datum value = slot->tts_values[attnum - 1];
481 101 : bool isnull = slot->tts_isnull[attnum - 1];
482 :
483 101 : if (isnull)
484 : {
485 20 : CopySendInt32(cstate, -1);
486 : }
487 : else
488 : {
489 : bytea *outputbytes;
490 :
491 81 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
492 : value);
493 81 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
494 81 : CopySendData(cstate, VARDATA(outputbytes),
495 81 : VARSIZE(outputbytes) - VARHDRSZ);
496 : }
497 : }
498 :
499 19 : CopySendEndOfRow(cstate);
500 19 : }
501 :
502 : /* Implementation of the end callback for binary format */
503 : static void
504 8 : CopyToBinaryEnd(CopyToState cstate)
505 : {
506 : /* Generate trailer for a binary copy */
507 8 : CopySendInt16(cstate, -1);
508 : /* Need to flush out the trailer */
509 8 : CopySendEndOfRow(cstate);
510 8 : }
511 :
512 : /*
513 : * Send copy start/stop messages for frontend copies. These have changed
514 : * in past protocol redesigns.
515 : */
516 : static void
517 5135 : SendCopyBegin(CopyToState cstate)
518 : {
519 : StringInfoData buf;
520 5135 : int natts = list_length(cstate->attnumlist);
521 5135 : int16 format = (cstate->opts.format == COPY_FORMAT_BINARY ? 1 : 0);
522 : int i;
523 :
524 5135 : pq_beginmessage(&buf, PqMsg_CopyOutResponse);
525 5135 : pq_sendbyte(&buf, format); /* overall format */
526 5135 : if (cstate->opts.format != COPY_FORMAT_JSON)
527 : {
528 5051 : pq_sendint16(&buf, natts);
529 22112 : for (i = 0; i < natts; i++)
530 17061 : pq_sendint16(&buf, format); /* per-column formats */
531 : }
532 : else
533 : {
534 : /*
535 : * For JSON format, report one text-format column. Each CopyData
536 : * message contains one complete JSON object, not individual column
537 : * values, so the per-column count is always 1.
538 : */
539 84 : pq_sendint16(&buf, 1);
540 84 : pq_sendint16(&buf, 0);
541 : }
542 :
543 5135 : pq_endmessage(&buf);
544 5135 : cstate->copy_dest = COPY_FRONTEND;
545 5135 : }
546 :
547 : static void
548 5134 : SendCopyEnd(CopyToState cstate)
549 : {
550 : /* Shouldn't have any unsent data */
551 : Assert(cstate->fe_msgbuf->len == 0);
552 : /* Send Copy Done message */
553 5134 : pq_putemptymessage(PqMsg_CopyDone);
554 5134 : }
555 :
556 : /*----------
557 : * CopySendData sends output data to the destination (file or frontend)
558 : * CopySendString does the same for null-terminated strings
559 : * CopySendChar does the same for single characters
560 : * CopySendEndOfRow does the appropriate thing at end of each data row
561 : * (data is not actually flushed except by CopySendEndOfRow)
562 : *
563 : * NB: no data conversion is applied by these functions
564 : *----------
565 : */
566 : static void
567 6506994 : CopySendData(CopyToState cstate, const void *databuf, int datasize)
568 : {
569 6506994 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
570 6506994 : }
571 :
572 : static void
573 611813 : CopySendString(CopyToState cstate, const char *str)
574 : {
575 611813 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
576 611813 : }
577 :
578 : static void
579 7242175 : CopySendChar(CopyToState cstate, char c)
580 : {
581 7242175 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
582 7242175 : }
583 :
584 : static void
585 1840674 : CopySendEndOfRow(CopyToState cstate)
586 : {
587 1840674 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
588 :
589 1840674 : switch (cstate->copy_dest)
590 : {
591 8195 : case COPY_FILE:
592 8195 : pgstat_report_wait_start(WAIT_EVENT_COPY_TO_WRITE);
593 8195 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
594 8195 : cstate->copy_file) != 1 ||
595 8195 : ferror(cstate->copy_file))
596 : {
597 0 : if (cstate->is_program)
598 : {
599 0 : if (errno == EPIPE)
600 : {
601 : /*
602 : * The pipe will be closed automatically on error at
603 : * the end of transaction, but we might get a better
604 : * error message from the subprocess' exit code than
605 : * just "Broken Pipe"
606 : */
607 0 : ClosePipeToProgram(cstate);
608 :
609 : /*
610 : * If ClosePipeToProgram() didn't throw an error, the
611 : * program terminated normally, but closed the pipe
612 : * first. Restore errno, and throw an error.
613 : */
614 0 : errno = EPIPE;
615 : }
616 0 : ereport(ERROR,
617 : (errcode_for_file_access(),
618 : errmsg("could not write to COPY program: %m")));
619 : }
620 : else
621 0 : ereport(ERROR,
622 : (errcode_for_file_access(),
623 : errmsg("could not write to COPY file: %m")));
624 : }
625 8195 : pgstat_report_wait_end();
626 8195 : break;
627 1832476 : case COPY_FRONTEND:
628 : /* Dump the accumulated row as one CopyData message */
629 1832476 : (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
630 1832476 : break;
631 3 : case COPY_CALLBACK:
632 3 : cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
633 3 : break;
634 : }
635 :
636 : /* Update the progress */
637 1840674 : cstate->bytes_processed += fe_msgbuf->len;
638 1840674 : pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
639 :
640 1840674 : resetStringInfo(fe_msgbuf);
641 1840674 : }
642 :
643 : /*
644 : * Wrapper function of CopySendEndOfRow for text, CSV, and json formats. Sends the
645 : * line termination and do common appropriate things for the end of row.
646 : */
647 : static inline void
648 1840647 : CopySendTextLikeEndOfRow(CopyToState cstate)
649 : {
650 1840647 : switch (cstate->copy_dest)
651 : {
652 8179 : case COPY_FILE:
653 : /* Default line termination depends on platform */
654 : #ifndef WIN32
655 8179 : CopySendChar(cstate, '\n');
656 : #else
657 : CopySendString(cstate, "\r\n");
658 : #endif
659 8179 : break;
660 1832465 : case COPY_FRONTEND:
661 : /* The FE/BE protocol uses \n as newline for all platforms */
662 1832465 : CopySendChar(cstate, '\n');
663 1832465 : break;
664 3 : default:
665 3 : break;
666 : }
667 :
668 : /* Now take the actions related to the end of a row */
669 1840647 : CopySendEndOfRow(cstate);
670 1840647 : }
671 :
672 : /*
673 : * These functions do apply some data conversion
674 : */
675 :
676 : /*
677 : * CopySendInt32 sends an int32 in network byte order
678 : */
679 : static inline void
680 117 : CopySendInt32(CopyToState cstate, int32 val)
681 : {
682 : uint32 buf;
683 :
684 117 : buf = pg_hton32((uint32) val);
685 117 : CopySendData(cstate, &buf, sizeof(buf));
686 117 : }
687 :
688 : /*
689 : * CopySendInt16 sends an int16 in network byte order
690 : */
691 : static inline void
692 27 : CopySendInt16(CopyToState cstate, int16 val)
693 : {
694 : uint16 buf;
695 :
696 27 : buf = pg_hton16((uint16) val);
697 27 : CopySendData(cstate, &buf, sizeof(buf));
698 27 : }
699 :
700 : /*
701 : * Closes the pipe to an external program, checking the pclose() return code.
702 : */
703 : static void
704 0 : ClosePipeToProgram(CopyToState cstate)
705 : {
706 : int pclose_rc;
707 :
708 : Assert(cstate->is_program);
709 :
710 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
711 0 : if (pclose_rc == -1)
712 0 : ereport(ERROR,
713 : (errcode_for_file_access(),
714 : errmsg("could not close pipe to external command: %m")));
715 0 : else if (pclose_rc != 0)
716 : {
717 0 : ereport(ERROR,
718 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
719 : errmsg("program \"%s\" failed",
720 : cstate->filename),
721 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
722 : }
723 0 : }
724 :
725 : /*
726 : * Release resources allocated in a cstate for COPY TO.
727 : */
728 : static void
729 5184 : EndCopy(CopyToState cstate)
730 : {
731 5184 : if (cstate->is_program)
732 : {
733 0 : ClosePipeToProgram(cstate);
734 : }
735 : else
736 : {
737 5184 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
738 0 : ereport(ERROR,
739 : (errcode_for_file_access(),
740 : errmsg("could not close file \"%s\": %m",
741 : cstate->filename)));
742 : }
743 :
744 5184 : pgstat_progress_end_command();
745 :
746 5184 : MemoryContextDelete(cstate->copycontext);
747 :
748 5184 : if (cstate->partitions)
749 19 : list_free(cstate->partitions);
750 :
751 5184 : pfree(cstate);
752 5184 : }
753 :
754 : /*
755 : * Setup CopyToState to read tuples from a table or a query for COPY TO.
756 : *
757 : * 'rel': Relation to be copied
758 : * 'raw_query': Query whose results are to be copied
759 : * 'queryRelId': OID of base relation to convert to a query (for RLS)
760 : * 'filename': Name of server-local file to write, NULL for STDOUT
761 : * 'is_program': true if 'filename' is program to execute
762 : * 'data_dest_cb': Callback that processes the output data
763 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
764 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
765 : *
766 : * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
767 : */
768 : CopyToState
769 5383 : BeginCopyTo(ParseState *pstate,
770 : Relation rel,
771 : RawStmt *raw_query,
772 : Oid queryRelId,
773 : const char *filename,
774 : bool is_program,
775 : copy_data_dest_cb data_dest_cb,
776 : List *attnamelist,
777 : List *options)
778 : {
779 : CopyToState cstate;
780 5383 : bool pipe = (filename == NULL && data_dest_cb == NULL);
781 : TupleDesc tupDesc;
782 : int num_phys_attrs;
783 : MemoryContext oldcontext;
784 5383 : const int progress_cols[] = {
785 : PROGRESS_COPY_COMMAND,
786 : PROGRESS_COPY_TYPE
787 : };
788 5383 : int64 progress_vals[] = {
789 : PROGRESS_COPY_COMMAND_TO,
790 : 0
791 : };
792 5383 : List *children = NIL;
793 :
794 5383 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
795 : {
796 36 : if (rel->rd_rel->relkind == RELKIND_VIEW)
797 8 : ereport(ERROR,
798 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
799 : errmsg("cannot copy from view \"%s\"",
800 : RelationGetRelationName(rel)),
801 : errhint("Try the COPY (SELECT ...) TO variant.")));
802 28 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
803 : {
804 8 : if (!RelationIsPopulated(rel))
805 4 : ereport(ERROR,
806 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
807 : errmsg("cannot copy from unpopulated materialized view \"%s\"",
808 : RelationGetRelationName(rel)),
809 : errhint("Use the REFRESH MATERIALIZED VIEW command."));
810 : }
811 20 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
812 0 : ereport(ERROR,
813 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
814 : errmsg("cannot copy from foreign table \"%s\"",
815 : RelationGetRelationName(rel)),
816 : errhint("Try the COPY (SELECT ...) TO variant.")));
817 20 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
818 0 : ereport(ERROR,
819 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
820 : errmsg("cannot copy from sequence \"%s\"",
821 : RelationGetRelationName(rel))));
822 20 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
823 : {
824 : /*
825 : * Collect OIDs of relation containing data, so that later
826 : * DoCopyTo can copy the data from them.
827 : */
828 20 : children = find_all_inheritors(RelationGetRelid(rel), AccessShareLock, NULL);
829 :
830 104 : foreach_oid(child, children)
831 : {
832 66 : char relkind = get_rel_relkind(child);
833 :
834 66 : if (relkind == RELKIND_FOREIGN_TABLE)
835 : {
836 1 : char *relation_name = get_rel_name(child);
837 :
838 1 : ereport(ERROR,
839 : errcode(ERRCODE_WRONG_OBJECT_TYPE),
840 : errmsg("cannot copy from foreign table \"%s\"", relation_name),
841 : errdetail("Partition \"%s\" is a foreign table in partitioned table \"%s\"",
842 : relation_name, RelationGetRelationName(rel)),
843 : errhint("Try the COPY (SELECT ...) TO variant."));
844 : }
845 :
846 : /* Exclude tables with no data */
847 65 : if (RELKIND_HAS_PARTITIONS(relkind))
848 31 : children = foreach_delete_current(children, child);
849 : }
850 : }
851 : else
852 0 : ereport(ERROR,
853 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
854 : errmsg("cannot copy from non-table relation \"%s\"",
855 : RelationGetRelationName(rel))));
856 : }
857 :
858 :
859 : /* Allocate workspace and zero all fields */
860 5370 : cstate = palloc0_object(CopyToStateData);
861 :
862 : /*
863 : * We allocate everything used by a cstate in a new memory context. This
864 : * avoids memory leaks during repeated use of COPY in a query.
865 : */
866 5370 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
867 : "COPY",
868 : ALLOCSET_DEFAULT_SIZES);
869 :
870 5370 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
871 :
872 : /* Extract options from the statement node tree */
873 5370 : ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
874 :
875 : /* Set format routine */
876 5274 : cstate->routine = CopyToGetRoutine(&cstate->opts);
877 :
878 : /* Process the source/target relation or query */
879 5274 : if (rel)
880 : {
881 : Assert(!raw_query);
882 :
883 4875 : cstate->rel = rel;
884 :
885 4875 : tupDesc = RelationGetDescr(cstate->rel);
886 4875 : cstate->partitions = children;
887 4875 : cstate->tupDesc = tupDesc;
888 : }
889 : else
890 : {
891 : List *rewritten;
892 : Query *query;
893 : PlannedStmt *plan;
894 : DestReceiver *dest;
895 :
896 399 : cstate->rel = NULL;
897 399 : cstate->partitions = NIL;
898 :
899 : /*
900 : * Run parse analysis and rewrite. Note this also acquires sufficient
901 : * locks on the source table(s).
902 : */
903 399 : rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
904 : pstate->p_sourcetext, NULL, 0,
905 : NULL);
906 :
907 : /* check that we got back something we can work with */
908 391 : if (rewritten == NIL)
909 : {
910 12 : ereport(ERROR,
911 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
912 : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
913 : }
914 379 : else if (list_length(rewritten) > 1)
915 : {
916 : ListCell *lc;
917 :
918 : /* examine queries to determine which error message to issue */
919 68 : foreach(lc, rewritten)
920 : {
921 56 : Query *q = lfirst_node(Query, lc);
922 :
923 56 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
924 12 : ereport(ERROR,
925 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
926 : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
927 44 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
928 12 : ereport(ERROR,
929 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
930 : errmsg("DO ALSO rules are not supported for COPY")));
931 : }
932 :
933 12 : ereport(ERROR,
934 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
935 : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
936 : }
937 :
938 343 : query = linitial_node(Query, rewritten);
939 :
940 : /* The grammar allows SELECT INTO, but we don't support that */
941 343 : if (query->utilityStmt != NULL &&
942 12 : IsA(query->utilityStmt, CreateTableAsStmt))
943 8 : ereport(ERROR,
944 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
945 : errmsg("COPY (SELECT INTO) is not supported")));
946 :
947 : /* The only other utility command we could see is NOTIFY */
948 335 : if (query->utilityStmt != NULL)
949 4 : ereport(ERROR,
950 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
951 : errmsg("COPY query must not be a utility command")));
952 :
953 : /*
954 : * Similarly the grammar doesn't enforce the presence of a RETURNING
955 : * clause, but this is required here.
956 : */
957 331 : if (query->commandType != CMD_SELECT &&
958 69 : query->returningList == NIL)
959 : {
960 : Assert(query->commandType == CMD_INSERT ||
961 : query->commandType == CMD_UPDATE ||
962 : query->commandType == CMD_DELETE ||
963 : query->commandType == CMD_MERGE);
964 :
965 16 : ereport(ERROR,
966 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
967 : errmsg("COPY query must have a RETURNING clause")));
968 : }
969 :
970 : /* plan the query */
971 315 : plan = pg_plan_query(query, pstate->p_sourcetext,
972 : CURSOR_OPT_PARALLEL_OK, NULL, NULL);
973 :
974 : /*
975 : * With row-level security and a user using "COPY relation TO", we
976 : * have to convert the "COPY relation TO" to a query-based COPY (eg:
977 : * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
978 : * add in any RLS clauses.
979 : *
980 : * When this happens, we are passed in the relid of the originally
981 : * found relation (which we have locked). As the planner will look up
982 : * the relation again, we double-check here to make sure it found the
983 : * same one that we have locked.
984 : */
985 314 : if (queryRelId != InvalidOid)
986 : {
987 : /*
988 : * Note that with RLS involved there may be multiple relations,
989 : * and while the one we need is almost certainly first, we don't
990 : * make any guarantees of that in the planner, so check the whole
991 : * list and make sure we find the original relation.
992 : */
993 52 : if (!list_member_oid(plan->relationOids, queryRelId))
994 0 : ereport(ERROR,
995 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
996 : errmsg("relation referenced by COPY statement has changed")));
997 : }
998 :
999 : /*
1000 : * Use a snapshot with an updated command ID to ensure this query sees
1001 : * results of any previously executed queries.
1002 : */
1003 314 : PushCopiedSnapshot(GetActiveSnapshot());
1004 314 : UpdateActiveSnapshotCommandId();
1005 :
1006 : /* Create dest receiver for COPY OUT */
1007 314 : dest = CreateDestReceiver(DestCopyOut);
1008 314 : ((DR_copy *) dest)->cstate = cstate;
1009 :
1010 : /* Create a QueryDesc requesting no output */
1011 314 : cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1012 : GetActiveSnapshot(),
1013 : InvalidSnapshot,
1014 : dest, NULL, NULL, 0);
1015 :
1016 : /*
1017 : * Call ExecutorStart to prepare the plan for execution.
1018 : *
1019 : * ExecutorStart computes a result tupdesc for us
1020 : */
1021 314 : ExecutorStart(cstate->queryDesc, 0);
1022 :
1023 310 : tupDesc = cstate->queryDesc->tupDesc;
1024 310 : tupDesc = BlessTupleDesc(tupDesc);
1025 310 : cstate->tupDesc = tupDesc;
1026 : }
1027 :
1028 : /* Generate or convert list of attributes to process */
1029 5185 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1030 :
1031 : /* Set up JSON-specific state */
1032 5185 : if (cstate->opts.format == COPY_FORMAT_JSON)
1033 : {
1034 84 : cstate->json_buf = makeStringInfo();
1035 :
1036 84 : if (attnamelist != NIL && rel)
1037 : {
1038 32 : int natts = list_length(cstate->attnumlist);
1039 : TupleDesc resultDesc;
1040 :
1041 : /*
1042 : * Build a TupleDesc describing only the selected columns so that
1043 : * composite_to_json() emits the right column names and types.
1044 : */
1045 32 : resultDesc = CreateTemplateTupleDesc(natts);
1046 :
1047 136 : foreach_int(attnum, cstate->attnumlist)
1048 : {
1049 72 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1050 :
1051 72 : TupleDescInitEntry(resultDesc,
1052 72 : foreach_current_index(attnum) + 1,
1053 72 : NameStr(attr->attname),
1054 : attr->atttypid,
1055 : attr->atttypmod,
1056 72 : attr->attndims);
1057 : }
1058 :
1059 32 : TupleDescFinalize(resultDesc);
1060 32 : cstate->tupDesc = BlessTupleDesc(resultDesc);
1061 :
1062 : /*
1063 : * Pre-allocate arrays for projecting selected column values into
1064 : * sequential positions matching the custom TupleDesc.
1065 : */
1066 32 : cstate->json_projvalues = palloc_array(Datum, natts);
1067 32 : cstate->json_projnulls = palloc_array(bool, natts);
1068 : }
1069 : }
1070 :
1071 5185 : num_phys_attrs = tupDesc->natts;
1072 :
1073 : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1074 5185 : cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1075 5185 : if (cstate->opts.force_quote_all)
1076 : {
1077 12 : MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool));
1078 : }
1079 5173 : else if (cstate->opts.force_quote)
1080 : {
1081 : List *attnums;
1082 : ListCell *cur;
1083 :
1084 16 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
1085 :
1086 32 : foreach(cur, attnums)
1087 : {
1088 16 : int attnum = lfirst_int(cur);
1089 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1090 :
1091 16 : if (!list_member_int(cstate->attnumlist, attnum))
1092 0 : ereport(ERROR,
1093 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1094 : /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1095 : errmsg("%s column \"%s\" not referenced by COPY",
1096 : "FORCE_QUOTE", NameStr(attr->attname))));
1097 16 : cstate->opts.force_quote_flags[attnum - 1] = true;
1098 : }
1099 : }
1100 :
1101 : /* Use client encoding when ENCODING option is not specified. */
1102 5185 : if (cstate->opts.file_encoding < 0)
1103 5165 : cstate->file_encoding = pg_get_client_encoding();
1104 : else
1105 20 : cstate->file_encoding = cstate->opts.file_encoding;
1106 :
1107 : /*
1108 : * Set up encoding conversion info if the file and server encodings differ
1109 : * (see also pg_server_to_any).
1110 : */
1111 5185 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1112 13 : cstate->file_encoding == PG_SQL_ASCII)
1113 5176 : cstate->need_transcoding = false;
1114 : else
1115 9 : cstate->need_transcoding = true;
1116 :
1117 : /* See Multibyte encoding comment above */
1118 5185 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1119 :
1120 5185 : cstate->copy_dest = COPY_FILE; /* default */
1121 :
1122 5185 : if (data_dest_cb)
1123 : {
1124 1 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1125 1 : cstate->copy_dest = COPY_CALLBACK;
1126 1 : cstate->data_dest_cb = data_dest_cb;
1127 : }
1128 5184 : else if (pipe)
1129 : {
1130 5135 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1131 :
1132 : Assert(!is_program); /* the grammar does not allow this */
1133 5135 : if (whereToSendOutput != DestRemote)
1134 0 : cstate->copy_file = stdout;
1135 : }
1136 : else
1137 : {
1138 49 : cstate->filename = pstrdup(filename);
1139 49 : cstate->is_program = is_program;
1140 :
1141 49 : if (is_program)
1142 : {
1143 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1144 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1145 0 : if (cstate->copy_file == NULL)
1146 0 : ereport(ERROR,
1147 : (errcode_for_file_access(),
1148 : errmsg("could not execute command \"%s\": %m",
1149 : cstate->filename)));
1150 : }
1151 : else
1152 : {
1153 : mode_t oumask; /* Pre-existing umask value */
1154 : struct stat st;
1155 :
1156 49 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1157 :
1158 : /*
1159 : * Prevent write to relative path ... too easy to shoot oneself in
1160 : * the foot by overwriting a database file ...
1161 : */
1162 49 : if (!is_absolute_path(filename))
1163 0 : ereport(ERROR,
1164 : (errcode(ERRCODE_INVALID_NAME),
1165 : errmsg("relative path not allowed for COPY to file")));
1166 :
1167 49 : oumask = umask(S_IWGRP | S_IWOTH);
1168 49 : PG_TRY();
1169 : {
1170 49 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1171 : }
1172 0 : PG_FINALLY();
1173 : {
1174 49 : umask(oumask);
1175 : }
1176 49 : PG_END_TRY();
1177 49 : if (cstate->copy_file == NULL)
1178 : {
1179 : /* copy errno because ereport subfunctions might change it */
1180 0 : int save_errno = errno;
1181 :
1182 0 : ereport(ERROR,
1183 : (errcode_for_file_access(),
1184 : errmsg("could not open file \"%s\" for writing: %m",
1185 : cstate->filename),
1186 : (save_errno == ENOENT || save_errno == EACCES) ?
1187 : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1188 : "You may want a client-side facility such as psql's \\copy.") : 0));
1189 : }
1190 :
1191 49 : if (fstat(fileno(cstate->copy_file), &st))
1192 0 : ereport(ERROR,
1193 : (errcode_for_file_access(),
1194 : errmsg("could not stat file \"%s\": %m",
1195 : cstate->filename)));
1196 :
1197 49 : if (S_ISDIR(st.st_mode))
1198 0 : ereport(ERROR,
1199 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1200 : errmsg("\"%s\" is a directory", cstate->filename)));
1201 : }
1202 : }
1203 :
1204 : /* initialize progress */
1205 5185 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1206 5185 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1207 5185 : pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
1208 :
1209 5185 : cstate->bytes_processed = 0;
1210 :
1211 5185 : MemoryContextSwitchTo(oldcontext);
1212 :
1213 5185 : return cstate;
1214 : }
1215 :
1216 : /*
1217 : * Clean up storage and release resources for COPY TO.
1218 : */
1219 : void
1220 5184 : EndCopyTo(CopyToState cstate)
1221 : {
1222 5184 : if (cstate->queryDesc != NULL)
1223 : {
1224 : /* Close down the query and free resources. */
1225 310 : ExecutorFinish(cstate->queryDesc);
1226 310 : ExecutorEnd(cstate->queryDesc);
1227 310 : FreeQueryDesc(cstate->queryDesc);
1228 310 : PopActiveSnapshot();
1229 : }
1230 :
1231 : /* Clean up storage */
1232 5184 : EndCopy(cstate);
1233 5184 : }
1234 :
1235 : /*
1236 : * Copy from relation or query TO file.
1237 : *
1238 : * Returns the number of rows processed.
1239 : */
1240 : uint64
1241 5185 : DoCopyTo(CopyToState cstate)
1242 : {
1243 5185 : bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
1244 5185 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1245 : TupleDesc tupDesc;
1246 : int num_phys_attrs;
1247 : ListCell *cur;
1248 5185 : uint64 processed = 0;
1249 :
1250 5185 : if (fe_copy)
1251 5135 : SendCopyBegin(cstate);
1252 :
1253 5185 : if (cstate->rel)
1254 4875 : tupDesc = RelationGetDescr(cstate->rel);
1255 : else
1256 310 : tupDesc = cstate->queryDesc->tupDesc;
1257 5185 : num_phys_attrs = tupDesc->natts;
1258 5185 : cstate->opts.null_print_client = cstate->opts.null_print; /* default */
1259 :
1260 : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1261 5185 : cstate->fe_msgbuf = makeStringInfo();
1262 :
1263 : /* Get info about the columns we need to process. */
1264 5185 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1265 22606 : foreach(cur, cstate->attnumlist)
1266 : {
1267 17422 : int attnum = lfirst_int(cur);
1268 17422 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1269 :
1270 17422 : cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
1271 17422 : &cstate->out_functions[attnum - 1]);
1272 : }
1273 :
1274 : /*
1275 : * Create a temporary memory context that we can reset once per row to
1276 : * recover palloc'd memory. This avoids any problems with leaks inside
1277 : * datatype output routines, and should be faster than retail pfree's
1278 : * anyway. (We don't need a whole econtext as CopyFrom does.)
1279 : */
1280 5184 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1281 : "COPY TO",
1282 : ALLOCSET_DEFAULT_SIZES);
1283 :
1284 5184 : cstate->routine->CopyToStart(cstate, tupDesc);
1285 :
1286 5184 : if (cstate->rel)
1287 : {
1288 : /*
1289 : * If COPY TO source table is a partitioned table, then open each
1290 : * partition and process each individual partition.
1291 : */
1292 4874 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1293 : {
1294 72 : foreach_oid(child, cstate->partitions)
1295 : {
1296 : Relation scan_rel;
1297 :
1298 : /* We already got the lock in BeginCopyTo */
1299 34 : scan_rel = table_open(child, NoLock);
1300 34 : CopyRelationTo(cstate, scan_rel, cstate->rel, &processed);
1301 34 : table_close(scan_rel, NoLock);
1302 : }
1303 : }
1304 : else
1305 4855 : CopyRelationTo(cstate, cstate->rel, NULL, &processed);
1306 : }
1307 : else
1308 : {
1309 : /* run the plan --- the dest receiver will send tuples */
1310 310 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
1311 310 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1312 : }
1313 :
1314 5184 : cstate->routine->CopyToEnd(cstate);
1315 :
1316 5184 : MemoryContextDelete(cstate->rowcontext);
1317 :
1318 5184 : if (fe_copy)
1319 5134 : SendCopyEnd(cstate);
1320 :
1321 5184 : return processed;
1322 : }
1323 :
1324 : /*
1325 : * Scans a single table and exports its rows to the COPY destination.
1326 : *
1327 : * root_rel can be set to the root table of rel if rel is a partition
1328 : * table so that we can send tuples in root_rel's rowtype, which might
1329 : * differ from individual partitions.
1330 : */
1331 : static void
1332 4889 : CopyRelationTo(CopyToState cstate, Relation rel, Relation root_rel, uint64 *processed)
1333 : {
1334 : TupleTableSlot *slot;
1335 : TableScanDesc scandesc;
1336 4889 : AttrMap *map = NULL;
1337 4889 : TupleTableSlot *root_slot = NULL;
1338 :
1339 4889 : scandesc = table_beginscan(rel, GetActiveSnapshot(), 0, NULL);
1340 4889 : slot = table_slot_create(rel, NULL);
1341 :
1342 : /*
1343 : * If we are exporting partition data here, we check if converting tuples
1344 : * to the root table's rowtype, because a partition might have column
1345 : * order different than its root table.
1346 : */
1347 4889 : if (root_rel != NULL)
1348 : {
1349 34 : root_slot = table_slot_create(root_rel, NULL);
1350 34 : map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
1351 : RelationGetDescr(rel),
1352 : false);
1353 : }
1354 :
1355 1840717 : while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
1356 : {
1357 : TupleTableSlot *copyslot;
1358 :
1359 1835828 : CHECK_FOR_INTERRUPTS();
1360 :
1361 1835828 : if (map != NULL)
1362 18 : copyslot = execute_attr_map_slot(map, slot, root_slot);
1363 : else
1364 : {
1365 : /* Deconstruct the tuple */
1366 1835810 : slot_getallattrs(slot);
1367 1835810 : copyslot = slot;
1368 : }
1369 :
1370 : /* Format and send the data */
1371 1835828 : CopyOneRowTo(cstate, copyslot);
1372 :
1373 : /*
1374 : * Increment the number of processed tuples, and report the progress.
1375 : */
1376 1835828 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1377 1835828 : ++(*processed));
1378 : }
1379 :
1380 4889 : ExecDropSingleTupleTableSlot(slot);
1381 :
1382 4889 : if (root_slot != NULL)
1383 34 : ExecDropSingleTupleTableSlot(root_slot);
1384 :
1385 4889 : if (map != NULL)
1386 7 : free_attrmap(map);
1387 :
1388 4889 : table_endscan(scandesc);
1389 4889 : }
1390 :
1391 : /*
1392 : * Emit one row during DoCopyTo().
1393 : */
1394 : static inline void
1395 1840602 : CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
1396 : {
1397 : MemoryContext oldcontext;
1398 :
1399 1840602 : MemoryContextReset(cstate->rowcontext);
1400 1840602 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1401 :
1402 : /* Make sure the tuple is fully deconstructed */
1403 1840602 : slot_getallattrs(slot);
1404 :
1405 1840602 : cstate->routine->CopyToOneRow(cstate, slot);
1406 :
1407 1840602 : MemoryContextSwitchTo(oldcontext);
1408 1840602 : }
1409 :
1410 : /*
1411 : * Send text representation of one attribute, with conversion and escaping
1412 : */
1413 : #define DUMPSOFAR() \
1414 : do { \
1415 : if (ptr > start) \
1416 : CopySendData(cstate, start, ptr - start); \
1417 : } while (0)
1418 :
1419 : static void
1420 6612649 : CopyAttributeOutText(CopyToState cstate, const char *string)
1421 : {
1422 : const char *ptr;
1423 : const char *start;
1424 : char c;
1425 6612649 : char delimc = cstate->opts.delim[0];
1426 :
1427 6612649 : if (cstate->need_transcoding)
1428 4 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1429 : else
1430 6612645 : ptr = string;
1431 :
1432 : /*
1433 : * We have to grovel through the string searching for control characters
1434 : * and instances of the delimiter character. In most cases, though, these
1435 : * are infrequent. To avoid overhead from calling CopySendData once per
1436 : * character, we dump out all characters between escaped characters in a
1437 : * single call. The loop invariant is that the data from "start" to "ptr"
1438 : * can be sent literally, but hasn't yet been.
1439 : *
1440 : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1441 : * in valid backend encodings, extra bytes of a multibyte character never
1442 : * look like ASCII. This loop is sufficiently performance-critical that
1443 : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1444 : * of the normal safe-encoding path.
1445 : */
1446 6612649 : if (cstate->encoding_embeds_ascii)
1447 : {
1448 4 : start = ptr;
1449 12 : while ((c = *ptr) != '\0')
1450 : {
1451 8 : if ((unsigned char) c < (unsigned char) 0x20)
1452 : {
1453 : /*
1454 : * \r and \n must be escaped, the others are traditional. We
1455 : * prefer to dump these using the C-like notation, rather than
1456 : * a backslash and the literal character, because it makes the
1457 : * dump file a bit more proof against Microsoftish data
1458 : * mangling.
1459 : */
1460 0 : switch (c)
1461 : {
1462 0 : case '\b':
1463 0 : c = 'b';
1464 0 : break;
1465 0 : case '\f':
1466 0 : c = 'f';
1467 0 : break;
1468 0 : case '\n':
1469 0 : c = 'n';
1470 0 : break;
1471 0 : case '\r':
1472 0 : c = 'r';
1473 0 : break;
1474 0 : case '\t':
1475 0 : c = 't';
1476 0 : break;
1477 0 : case '\v':
1478 0 : c = 'v';
1479 0 : break;
1480 0 : default:
1481 : /* If it's the delimiter, must backslash it */
1482 0 : if (c == delimc)
1483 0 : break;
1484 : /* All ASCII control chars are length 1 */
1485 0 : ptr++;
1486 0 : continue; /* fall to end of loop */
1487 : }
1488 : /* if we get here, we need to convert the control char */
1489 0 : DUMPSOFAR();
1490 0 : CopySendChar(cstate, '\\');
1491 0 : CopySendChar(cstate, c);
1492 0 : start = ++ptr; /* do not include char in next run */
1493 : }
1494 8 : else if (c == '\\' || c == delimc)
1495 : {
1496 0 : DUMPSOFAR();
1497 0 : CopySendChar(cstate, '\\');
1498 0 : start = ptr++; /* we include char in next run */
1499 : }
1500 8 : else if (IS_HIGHBIT_SET(c))
1501 4 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1502 : else
1503 4 : ptr++;
1504 : }
1505 : }
1506 : else
1507 : {
1508 6612645 : start = ptr;
1509 72071662 : while ((c = *ptr) != '\0')
1510 : {
1511 65459017 : if ((unsigned char) c < (unsigned char) 0x20)
1512 : {
1513 : /*
1514 : * \r and \n must be escaped, the others are traditional. We
1515 : * prefer to dump these using the C-like notation, rather than
1516 : * a backslash and the literal character, because it makes the
1517 : * dump file a bit more proof against Microsoftish data
1518 : * mangling.
1519 : */
1520 7009 : switch (c)
1521 : {
1522 0 : case '\b':
1523 0 : c = 'b';
1524 0 : break;
1525 0 : case '\f':
1526 0 : c = 'f';
1527 0 : break;
1528 5935 : case '\n':
1529 5935 : c = 'n';
1530 5935 : break;
1531 0 : case '\r':
1532 0 : c = 'r';
1533 0 : break;
1534 1074 : case '\t':
1535 1074 : c = 't';
1536 1074 : break;
1537 0 : case '\v':
1538 0 : c = 'v';
1539 0 : break;
1540 0 : default:
1541 : /* If it's the delimiter, must backslash it */
1542 0 : if (c == delimc)
1543 0 : break;
1544 : /* All ASCII control chars are length 1 */
1545 0 : ptr++;
1546 0 : continue; /* fall to end of loop */
1547 : }
1548 : /* if we get here, we need to convert the control char */
1549 7009 : DUMPSOFAR();
1550 7009 : CopySendChar(cstate, '\\');
1551 7009 : CopySendChar(cstate, c);
1552 7009 : start = ++ptr; /* do not include char in next run */
1553 : }
1554 65452008 : else if (c == '\\' || c == delimc)
1555 : {
1556 2453 : DUMPSOFAR();
1557 2453 : CopySendChar(cstate, '\\');
1558 2453 : start = ptr++; /* we include char in next run */
1559 : }
1560 : else
1561 65449555 : ptr++;
1562 : }
1563 : }
1564 :
1565 6612649 : DUMPSOFAR();
1566 6612649 : }
1567 :
1568 : /*
1569 : * Send text representation of one attribute, with conversion and
1570 : * CSV-style escaping
1571 : */
1572 : static void
1573 416 : CopyAttributeOutCSV(CopyToState cstate, const char *string,
1574 : bool use_quote)
1575 : {
1576 : const char *ptr;
1577 : const char *start;
1578 : char c;
1579 416 : char delimc = cstate->opts.delim[0];
1580 416 : char quotec = cstate->opts.quote[0];
1581 416 : char escapec = cstate->opts.escape[0];
1582 416 : bool single_attr = (list_length(cstate->attnumlist) == 1);
1583 :
1584 : /* force quoting if it matches null_print (before conversion!) */
1585 416 : if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1586 36 : use_quote = true;
1587 :
1588 416 : if (cstate->need_transcoding)
1589 4 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1590 : else
1591 412 : ptr = string;
1592 :
1593 : /*
1594 : * Make a preliminary pass to discover if it needs quoting
1595 : */
1596 416 : if (!use_quote)
1597 : {
1598 : /*
1599 : * Quote '\.' if it appears alone on a line, so that it will not be
1600 : * interpreted as an end-of-data marker. (PG 18 and up will not
1601 : * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1602 : * we want the data to be loadable by older versions too. Also, this
1603 : * avoids breaking clients that are still using PQgetline().)
1604 : */
1605 292 : if (single_attr && strcmp(ptr, "\\.") == 0)
1606 4 : use_quote = true;
1607 : else
1608 : {
1609 288 : const char *tptr = ptr;
1610 :
1611 1480 : while ((c = *tptr) != '\0')
1612 : {
1613 1284 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
1614 : {
1615 92 : use_quote = true;
1616 92 : break;
1617 : }
1618 1192 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1619 4 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1620 : else
1621 1188 : tptr++;
1622 : }
1623 : }
1624 : }
1625 :
1626 416 : if (use_quote)
1627 : {
1628 220 : CopySendChar(cstate, quotec);
1629 :
1630 : /*
1631 : * We adopt the same optimization strategy as in CopyAttributeOutText
1632 : */
1633 220 : start = ptr;
1634 1704 : while ((c = *ptr) != '\0')
1635 : {
1636 1484 : if (c == quotec || c == escapec)
1637 : {
1638 104 : DUMPSOFAR();
1639 104 : CopySendChar(cstate, escapec);
1640 104 : start = ptr; /* we include char in next run */
1641 : }
1642 1484 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1643 4 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1644 : else
1645 1480 : ptr++;
1646 : }
1647 220 : DUMPSOFAR();
1648 :
1649 220 : CopySendChar(cstate, quotec);
1650 : }
1651 : else
1652 : {
1653 : /* If it doesn't need quoting, we can just dump it as-is */
1654 196 : CopySendString(cstate, ptr);
1655 : }
1656 416 : }
1657 :
1658 : /*
1659 : * copy_dest_startup --- executor startup
1660 : */
1661 : static void
1662 310 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1663 : {
1664 : /* no-op */
1665 310 : }
1666 :
1667 : /*
1668 : * copy_dest_receive --- receive one tuple
1669 : */
1670 : static bool
1671 4774 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
1672 : {
1673 4774 : DR_copy *myState = (DR_copy *) self;
1674 4774 : CopyToState cstate = myState->cstate;
1675 :
1676 : /* Send the data */
1677 4774 : CopyOneRowTo(cstate, slot);
1678 :
1679 : /* Increment the number of processed tuples, and report the progress */
1680 4774 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1681 4774 : ++myState->processed);
1682 :
1683 4774 : return true;
1684 : }
1685 :
1686 : /*
1687 : * copy_dest_shutdown --- executor end
1688 : */
1689 : static void
1690 310 : copy_dest_shutdown(DestReceiver *self)
1691 : {
1692 : /* no-op */
1693 310 : }
1694 :
1695 : /*
1696 : * copy_dest_destroy --- release DestReceiver object
1697 : */
1698 : static void
1699 0 : copy_dest_destroy(DestReceiver *self)
1700 : {
1701 0 : pfree(self);
1702 0 : }
1703 :
1704 : /*
1705 : * CreateCopyDestReceiver -- create a suitable DestReceiver object
1706 : */
1707 : DestReceiver *
1708 314 : CreateCopyDestReceiver(void)
1709 : {
1710 314 : DR_copy *self = palloc_object(DR_copy);
1711 :
1712 314 : self->pub.receiveSlot = copy_dest_receive;
1713 314 : self->pub.rStartup = copy_dest_startup;
1714 314 : self->pub.rShutdown = copy_dest_shutdown;
1715 314 : self->pub.rDestroy = copy_dest_destroy;
1716 314 : self->pub.mydest = DestCopyOut;
1717 :
1718 314 : self->cstate = NULL; /* will be set later */
1719 314 : self->processed = 0;
1720 :
1721 314 : return (DestReceiver *) self;
1722 : }
|