Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : * Infrastructure for launching parallel workers
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/parallel.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/brin.h"
18 : #include "access/gin.h"
19 : #include "access/nbtree.h"
20 : #include "access/parallel.h"
21 : #include "access/session.h"
22 : #include "access/xact.h"
23 : #include "access/xlog.h"
24 : #include "catalog/index.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/pg_enum.h"
27 : #include "catalog/storage.h"
28 : #include "commands/async.h"
29 : #include "commands/vacuum.h"
30 : #include "executor/execParallel.h"
31 : #include "libpq/libpq.h"
32 : #include "libpq/pqformat.h"
33 : #include "libpq/pqmq.h"
34 : #include "miscadmin.h"
35 : #include "optimizer/optimizer.h"
36 : #include "pgstat.h"
37 : #include "storage/ipc.h"
38 : #include "storage/predicate.h"
39 : #include "storage/spin.h"
40 : #include "tcop/tcopprot.h"
41 : #include "utils/combocid.h"
42 : #include "utils/guc.h"
43 : #include "utils/inval.h"
44 : #include "utils/memutils.h"
45 : #include "utils/relmapper.h"
46 : #include "utils/snapmgr.h"
47 :
48 : /*
49 : * We don't want to waste a lot of memory on an error queue which, most of
50 : * the time, will process only a handful of small messages. However, it is
51 : * desirable to make it large enough that a typical ErrorResponse can be sent
52 : * without blocking. That way, a worker that errors out can write the whole
53 : * message into the queue and terminate without waiting for the user backend.
54 : */
55 : #define PARALLEL_ERROR_QUEUE_SIZE 16384
56 :
57 : /* Magic number for parallel context TOC. */
58 : #define PARALLEL_MAGIC 0x50477c7c
59 :
60 : /*
61 : * Magic numbers for per-context parallel state sharing. Higher-level code
62 : * should use smaller values, leaving these very large ones for use by this
63 : * module.
64 : */
65 : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
66 : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
67 : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
68 : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
69 : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
70 : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
71 : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
72 : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
73 : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
74 : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
75 : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
76 : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
77 : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
78 : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
79 : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
80 :
81 : /* Fixed-size parallel state. */
82 : typedef struct FixedParallelState
83 : {
84 : /* Fixed-size state that workers must restore. */
85 : Oid database_id;
86 : Oid authenticated_user_id;
87 : Oid session_user_id;
88 : Oid outer_user_id;
89 : Oid current_user_id;
90 : Oid temp_namespace_id;
91 : Oid temp_toast_namespace_id;
92 : int sec_context;
93 : bool session_user_is_superuser;
94 : bool role_is_superuser;
95 : PGPROC *parallel_leader_pgproc;
96 : pid_t parallel_leader_pid;
97 : ProcNumber parallel_leader_proc_number;
98 : TimestampTz xact_ts;
99 : TimestampTz stmt_ts;
100 : SerializableXactHandle serializable_xact_handle;
101 :
102 : /* Mutex protects remaining fields. */
103 : slock_t mutex;
104 :
105 : /* Maximum XactLastRecEnd of any worker. */
106 : XLogRecPtr last_xlog_end;
107 : } FixedParallelState;
108 :
109 : /*
110 : * Our parallel worker number. We initialize this to -1, meaning that we are
111 : * not a parallel worker. In parallel workers, it will be set to a value >= 0
112 : * and < the number of workers before any user code is invoked; each parallel
113 : * worker will get a different parallel worker number.
114 : */
115 : int ParallelWorkerNumber = -1;
116 :
117 : /* Is there a parallel message pending which we need to receive? */
118 : volatile sig_atomic_t ParallelMessagePending = false;
119 :
120 : /* Are we initializing a parallel worker? */
121 : bool InitializingParallelWorker = false;
122 :
123 : /* Pointer to our fixed parallel state. */
124 : static FixedParallelState *MyFixedParallelState;
125 :
126 : /* List of active parallel contexts. */
127 : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
128 :
129 : /* Backend-local copy of data from FixedParallelState. */
130 : static pid_t ParallelLeaderPid;
131 :
132 : /*
133 : * List of internal parallel worker entry points. We need this for
134 : * reasons explained in LookupParallelWorkerFunction(), below.
135 : */
136 : static const struct
137 : {
138 : const char *fn_name;
139 : parallel_worker_main_type fn_addr;
140 : } InternalParallelWorkers[] =
141 :
142 : {
143 : {
144 : "ParallelQueryMain", ParallelQueryMain
145 : },
146 : {
147 : "_bt_parallel_build_main", _bt_parallel_build_main
148 : },
149 : {
150 : "_brin_parallel_build_main", _brin_parallel_build_main
151 : },
152 : {
153 : "_gin_parallel_build_main", _gin_parallel_build_main
154 : },
155 : {
156 : "parallel_vacuum_main", parallel_vacuum_main
157 : }
158 : };
159 :
160 : /* Private functions. */
161 : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
162 : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
163 : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
164 : static void ParallelWorkerShutdown(int code, Datum arg);
165 :
166 :
167 : /*
168 : * Establish a new parallel context. This should be done after entering
169 : * parallel mode, and (unless there is an error) the context should be
170 : * destroyed before exiting the current subtransaction.
171 : */
172 : ParallelContext *
173 952 : CreateParallelContext(const char *library_name, const char *function_name,
174 : int nworkers)
175 : {
176 : MemoryContext oldcontext;
177 : ParallelContext *pcxt;
178 :
179 : /* It is unsafe to create a parallel context if not in parallel mode. */
180 : Assert(IsInParallelMode());
181 :
182 : /* Number of workers should be non-negative. */
183 : Assert(nworkers >= 0);
184 :
185 : /* We might be running in a short-lived memory context. */
186 952 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
187 :
188 : /* Initialize a new ParallelContext. */
189 952 : pcxt = palloc0_object(ParallelContext);
190 952 : pcxt->subid = GetCurrentSubTransactionId();
191 952 : pcxt->nworkers = nworkers;
192 952 : pcxt->nworkers_to_launch = nworkers;
193 952 : pcxt->library_name = pstrdup(library_name);
194 952 : pcxt->function_name = pstrdup(function_name);
195 952 : pcxt->error_context_stack = error_context_stack;
196 952 : shm_toc_initialize_estimator(&pcxt->estimator);
197 952 : dlist_push_head(&pcxt_list, &pcxt->node);
198 :
199 : /* Restore previous memory context. */
200 952 : MemoryContextSwitchTo(oldcontext);
201 :
202 952 : return pcxt;
203 : }
204 :
205 : /*
206 : * Establish the dynamic shared memory segment for a parallel context and
207 : * copy state and other bookkeeping information that will be needed by
208 : * parallel workers into it.
209 : */
210 : void
211 952 : InitializeParallelDSM(ParallelContext *pcxt)
212 : {
213 : MemoryContext oldcontext;
214 952 : Size library_len = 0;
215 952 : Size guc_len = 0;
216 952 : Size combocidlen = 0;
217 952 : Size tsnaplen = 0;
218 952 : Size asnaplen = 0;
219 952 : Size tstatelen = 0;
220 952 : Size pendingsyncslen = 0;
221 952 : Size reindexlen = 0;
222 952 : Size relmapperlen = 0;
223 952 : Size uncommittedenumslen = 0;
224 952 : Size clientconninfolen = 0;
225 952 : Size segsize = 0;
226 : int i;
227 : FixedParallelState *fps;
228 952 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
229 952 : Snapshot transaction_snapshot = GetTransactionSnapshot();
230 952 : Snapshot active_snapshot = GetActiveSnapshot();
231 :
232 : /* We might be running in a very short-lived memory context. */
233 952 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
234 :
235 : /* Allow space to store the fixed-size parallel state. */
236 952 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
237 952 : shm_toc_estimate_keys(&pcxt->estimator, 1);
238 :
239 : /*
240 : * If we manage to reach here while non-interruptible, it's unsafe to
241 : * launch any workers: we would fail to process interrupts sent by them.
242 : * We can deal with that edge case by pretending no workers were
243 : * requested.
244 : */
245 952 : if (!INTERRUPTS_CAN_BE_PROCESSED())
246 0 : pcxt->nworkers = 0;
247 :
248 : /*
249 : * Normally, the user will have requested at least one worker process, but
250 : * if by chance they have not, we can skip a bunch of things here.
251 : */
252 952 : if (pcxt->nworkers > 0)
253 : {
254 : /* Get (or create) the per-session DSM segment's handle. */
255 952 : session_dsm_handle = GetSessionDsmHandle();
256 :
257 : /*
258 : * If we weren't able to create a per-session DSM segment, then we can
259 : * continue but we can't safely launch any workers because their
260 : * record typmods would be incompatible so they couldn't exchange
261 : * tuples.
262 : */
263 952 : if (session_dsm_handle == DSM_HANDLE_INVALID)
264 0 : pcxt->nworkers = 0;
265 : }
266 :
267 952 : if (pcxt->nworkers > 0)
268 : {
269 : StaticAssertDecl(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
270 : PARALLEL_ERROR_QUEUE_SIZE,
271 : "parallel error queue size not buffer-aligned");
272 :
273 : /* Estimate space for various kinds of state sharing. */
274 952 : library_len = EstimateLibraryStateSpace();
275 952 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
276 952 : guc_len = EstimateGUCStateSpace();
277 952 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
278 952 : combocidlen = EstimateComboCIDStateSpace();
279 952 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
280 952 : if (IsolationUsesXactSnapshot())
281 : {
282 22 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
283 22 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
284 : }
285 952 : asnaplen = EstimateSnapshotSpace(active_snapshot);
286 952 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
287 952 : tstatelen = EstimateTransactionStateSpace();
288 952 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
289 952 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
290 952 : pendingsyncslen = EstimatePendingSyncsSpace();
291 952 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
292 952 : reindexlen = EstimateReindexStateSpace();
293 952 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
294 952 : relmapperlen = EstimateRelationMapSpace();
295 952 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
296 952 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
297 952 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
298 952 : clientconninfolen = EstimateClientConnectionInfoSpace();
299 952 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
300 : /* If you add more chunks here, you probably need to add keys. */
301 952 : shm_toc_estimate_keys(&pcxt->estimator, 12);
302 :
303 : /* Estimate space need for error queues. */
304 952 : shm_toc_estimate_chunk(&pcxt->estimator,
305 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
306 : pcxt->nworkers));
307 952 : shm_toc_estimate_keys(&pcxt->estimator, 1);
308 :
309 : /* Estimate how much we'll need for the entrypoint info. */
310 952 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
311 : strlen(pcxt->function_name) + 2);
312 952 : shm_toc_estimate_keys(&pcxt->estimator, 1);
313 : }
314 :
315 : /*
316 : * Create DSM and initialize with new table of contents. But if the user
317 : * didn't request any workers, then don't bother creating a dynamic shared
318 : * memory segment; instead, just use backend-private memory.
319 : *
320 : * Also, if we can't create a dynamic shared memory segment because the
321 : * maximum number of segments have already been created, then fall back to
322 : * backend-private memory, and plan not to use any workers. We hope this
323 : * won't happen very often, but it's better to abandon the use of
324 : * parallelism than to fail outright.
325 : */
326 952 : segsize = shm_toc_estimate(&pcxt->estimator);
327 952 : if (pcxt->nworkers > 0)
328 952 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
329 952 : if (pcxt->seg != NULL)
330 952 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
331 : dsm_segment_address(pcxt->seg),
332 : segsize);
333 : else
334 : {
335 0 : pcxt->nworkers = 0;
336 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
337 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
338 : segsize);
339 : }
340 :
341 : /* Initialize fixed-size state in shared memory. */
342 : fps = (FixedParallelState *)
343 952 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
344 952 : fps->database_id = MyDatabaseId;
345 952 : fps->authenticated_user_id = GetAuthenticatedUserId();
346 952 : fps->session_user_id = GetSessionUserId();
347 952 : fps->outer_user_id = GetCurrentRoleId();
348 952 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
349 952 : fps->session_user_is_superuser = GetSessionUserIsSuperuser();
350 952 : fps->role_is_superuser = current_role_is_superuser;
351 952 : GetTempNamespaceState(&fps->temp_namespace_id,
352 : &fps->temp_toast_namespace_id);
353 952 : fps->parallel_leader_pgproc = MyProc;
354 952 : fps->parallel_leader_pid = MyProcPid;
355 952 : fps->parallel_leader_proc_number = MyProcNumber;
356 952 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
357 952 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
358 952 : fps->serializable_xact_handle = ShareSerializableXact();
359 952 : SpinLockInit(&fps->mutex);
360 952 : fps->last_xlog_end = 0;
361 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
362 :
363 : /* We can skip the rest of this if we're not budgeting for any workers. */
364 952 : if (pcxt->nworkers > 0)
365 : {
366 : char *libraryspace;
367 : char *gucspace;
368 : char *combocidspace;
369 : char *tsnapspace;
370 : char *asnapspace;
371 : char *tstatespace;
372 : char *pendingsyncsspace;
373 : char *reindexspace;
374 : char *relmapperspace;
375 : char *error_queue_space;
376 : char *session_dsm_handle_space;
377 : char *entrypointstate;
378 : char *uncommittedenumsspace;
379 : char *clientconninfospace;
380 : Size lnamelen;
381 :
382 : /* Serialize shared libraries we have loaded. */
383 952 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
384 952 : SerializeLibraryState(library_len, libraryspace);
385 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
386 :
387 : /* Serialize GUC settings. */
388 952 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
389 952 : SerializeGUCState(guc_len, gucspace);
390 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
391 :
392 : /* Serialize combo CID state. */
393 952 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
394 952 : SerializeComboCIDState(combocidlen, combocidspace);
395 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
396 :
397 : /*
398 : * Serialize the transaction snapshot if the transaction isolation
399 : * level uses a transaction snapshot.
400 : */
401 952 : if (IsolationUsesXactSnapshot())
402 : {
403 22 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
404 22 : SerializeSnapshot(transaction_snapshot, tsnapspace);
405 22 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
406 : tsnapspace);
407 : }
408 :
409 : /* Serialize the active snapshot. */
410 952 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
411 952 : SerializeSnapshot(active_snapshot, asnapspace);
412 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
413 :
414 : /* Provide the handle for per-session segment. */
415 952 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
416 : sizeof(dsm_handle));
417 952 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
418 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
419 : session_dsm_handle_space);
420 :
421 : /* Serialize transaction state. */
422 952 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
423 952 : SerializeTransactionState(tstatelen, tstatespace);
424 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
425 :
426 : /* Serialize pending syncs. */
427 952 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
428 952 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
429 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
430 : pendingsyncsspace);
431 :
432 : /* Serialize reindex state. */
433 952 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
434 952 : SerializeReindexState(reindexlen, reindexspace);
435 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
436 :
437 : /* Serialize relmapper state. */
438 952 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
439 952 : SerializeRelationMap(relmapperlen, relmapperspace);
440 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
441 : relmapperspace);
442 :
443 : /* Serialize uncommitted enum state. */
444 952 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
445 : uncommittedenumslen);
446 952 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
447 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
448 : uncommittedenumsspace);
449 :
450 : /* Serialize our ClientConnectionInfo. */
451 952 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
452 952 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
453 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
454 : clientconninfospace);
455 :
456 : /* Allocate space for worker information. */
457 952 : pcxt->worker = palloc0_array(ParallelWorkerInfo, pcxt->nworkers);
458 :
459 : /*
460 : * Establish error queues in dynamic shared memory.
461 : *
462 : * These queues should be used only for transmitting ErrorResponse,
463 : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
464 : * should be transmitted via separate (possibly larger?) queues.
465 : */
466 : error_queue_space =
467 952 : shm_toc_allocate(pcxt->toc,
468 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
469 952 : pcxt->nworkers));
470 3090 : for (i = 0; i < pcxt->nworkers; ++i)
471 : {
472 : char *start;
473 : shm_mq *mq;
474 :
475 2138 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
476 2138 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
477 2138 : shm_mq_set_receiver(mq, MyProc);
478 2138 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
479 : }
480 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
481 :
482 : /*
483 : * Serialize entrypoint information. It's unsafe to pass function
484 : * pointers across processes, as the function pointer may be different
485 : * in each process in EXEC_BACKEND builds, so we always pass library
486 : * and function name. (We use library name "postgres" for functions
487 : * in the core backend.)
488 : */
489 952 : lnamelen = strlen(pcxt->library_name);
490 952 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
491 952 : strlen(pcxt->function_name) + 2);
492 952 : strcpy(entrypointstate, pcxt->library_name);
493 952 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
494 952 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
495 : }
496 :
497 : /* Update nworkers_to_launch, in case we changed nworkers above. */
498 952 : pcxt->nworkers_to_launch = pcxt->nworkers;
499 :
500 : /* Restore previous memory context. */
501 952 : MemoryContextSwitchTo(oldcontext);
502 952 : }
503 :
504 : /*
505 : * Reinitialize the dynamic shared memory segment for a parallel context such
506 : * that we could launch workers for it again.
507 : */
508 : void
509 260 : ReinitializeParallelDSM(ParallelContext *pcxt)
510 : {
511 : MemoryContext oldcontext;
512 : FixedParallelState *fps;
513 :
514 : /* We might be running in a very short-lived memory context. */
515 260 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
516 :
517 : /* Wait for any old workers to exit. */
518 260 : if (pcxt->nworkers_launched > 0)
519 : {
520 260 : WaitForParallelWorkersToFinish(pcxt);
521 260 : WaitForParallelWorkersToExit(pcxt);
522 260 : pcxt->nworkers_launched = 0;
523 260 : if (pcxt->known_attached_workers)
524 : {
525 260 : pfree(pcxt->known_attached_workers);
526 260 : pcxt->known_attached_workers = NULL;
527 260 : pcxt->nknown_attached_workers = 0;
528 : }
529 : }
530 :
531 : /* Reset a few bits of fixed parallel state to a clean state. */
532 260 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
533 260 : fps->last_xlog_end = 0;
534 :
535 : /* Recreate error queues (if they exist). */
536 260 : if (pcxt->nworkers > 0)
537 : {
538 : char *error_queue_space;
539 : int i;
540 :
541 : error_queue_space =
542 260 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
543 1086 : for (i = 0; i < pcxt->nworkers; ++i)
544 : {
545 : char *start;
546 : shm_mq *mq;
547 :
548 826 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
549 826 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
550 826 : shm_mq_set_receiver(mq, MyProc);
551 826 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
552 : }
553 : }
554 :
555 : /* Restore previous memory context. */
556 260 : MemoryContextSwitchTo(oldcontext);
557 260 : }
558 :
559 : /*
560 : * Reinitialize parallel workers for a parallel context such that we could
561 : * launch a different number of workers. This is required for cases where
562 : * we need to reuse the same DSM segment, but the number of workers can
563 : * vary from run-to-run.
564 : */
565 : void
566 36 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
567 : {
568 : /*
569 : * The number of workers that need to be launched must be less than the
570 : * number of workers with which the parallel context is initialized. But
571 : * the caller might not know that InitializeParallelDSM reduced nworkers,
572 : * so just silently trim the request.
573 : */
574 36 : pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
575 36 : }
576 :
577 : /*
578 : * Launch parallel workers.
579 : */
580 : void
581 1212 : LaunchParallelWorkers(ParallelContext *pcxt)
582 : {
583 : MemoryContext oldcontext;
584 : BackgroundWorker worker;
585 : int i;
586 1212 : bool any_registrations_failed = false;
587 :
588 : /* Skip this if we have no workers. */
589 1212 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
590 0 : return;
591 :
592 : /* We need to be a lock group leader. */
593 1212 : BecomeLockGroupLeader();
594 :
595 : /* If we do have workers, we'd better have a DSM segment. */
596 : Assert(pcxt->seg != NULL);
597 :
598 : /* We might be running in a short-lived memory context. */
599 1212 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
600 :
601 : /* Configure a worker. */
602 1212 : memset(&worker, 0, sizeof(worker));
603 1212 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
604 : MyProcPid);
605 1212 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
606 1212 : worker.bgw_flags =
607 : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
608 : | BGWORKER_CLASS_PARALLEL;
609 1212 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
610 1212 : worker.bgw_restart_time = BGW_NEVER_RESTART;
611 1212 : sprintf(worker.bgw_library_name, "postgres");
612 1212 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
613 1212 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
614 1212 : worker.bgw_notify_pid = MyProcPid;
615 :
616 : /*
617 : * Start workers.
618 : *
619 : * The caller must be able to tolerate ending up with fewer workers than
620 : * expected, so there is no need to throw an error here if registration
621 : * fails. It wouldn't help much anyway, because registering the worker in
622 : * no way guarantees that it will start up and initialize successfully.
623 : */
624 4174 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
625 : {
626 2962 : memcpy(worker.bgw_extra, &i, sizeof(int));
627 5866 : if (!any_registrations_failed &&
628 2904 : RegisterDynamicBackgroundWorker(&worker,
629 2904 : &pcxt->worker[i].bgwhandle))
630 : {
631 2876 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
632 2876 : pcxt->worker[i].bgwhandle);
633 2876 : pcxt->nworkers_launched++;
634 : }
635 : else
636 : {
637 : /*
638 : * If we weren't able to register the worker, then we've bumped up
639 : * against the max_worker_processes limit, and future
640 : * registrations will probably fail too, so arrange to skip them.
641 : * But we still have to execute this code for the remaining slots
642 : * to make sure that we forget about the error queues we budgeted
643 : * for those workers. Otherwise, we'll wait for them to start,
644 : * but they never will.
645 : */
646 86 : any_registrations_failed = true;
647 86 : pcxt->worker[i].bgwhandle = NULL;
648 86 : shm_mq_detach(pcxt->worker[i].error_mqh);
649 86 : pcxt->worker[i].error_mqh = NULL;
650 : }
651 : }
652 :
653 : /*
654 : * Now that nworkers_launched has taken its final value, we can initialize
655 : * known_attached_workers.
656 : */
657 1212 : if (pcxt->nworkers_launched > 0)
658 : {
659 1192 : pcxt->known_attached_workers = palloc0_array(bool, pcxt->nworkers_launched);
660 1192 : pcxt->nknown_attached_workers = 0;
661 : }
662 :
663 : /* Restore previous memory context. */
664 1212 : MemoryContextSwitchTo(oldcontext);
665 : }
666 :
667 : /*
668 : * Wait for all workers to attach to their error queues, and throw an error if
669 : * any worker fails to do this.
670 : *
671 : * Callers can assume that if this function returns successfully, then the
672 : * number of workers given by pcxt->nworkers_launched have initialized and
673 : * attached to their error queues. Whether or not these workers are guaranteed
674 : * to still be running depends on what code the caller asked them to run;
675 : * this function does not guarantee that they have not exited. However, it
676 : * does guarantee that any workers which exited must have done so cleanly and
677 : * after successfully performing the work with which they were tasked.
678 : *
679 : * If this function is not called, then some of the workers that were launched
680 : * may not have been started due to a fork() failure, or may have exited during
681 : * early startup prior to attaching to the error queue, so nworkers_launched
682 : * cannot be viewed as completely reliable. It will never be less than the
683 : * number of workers which actually started, but it might be more. Any workers
684 : * that failed to start will still be discovered by
685 : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
686 : * provided that function is eventually reached.
687 : *
688 : * In general, the leader process should do as much work as possible before
689 : * calling this function. fork() failures and other early-startup failures
690 : * are very uncommon, and having the leader sit idle when it could be doing
691 : * useful work is undesirable. However, if the leader needs to wait for
692 : * all of its workers or for a specific worker, it may want to call this
693 : * function before doing so. If not, it must make some other provision for
694 : * the failure-to-start case, lest it wait forever. On the other hand, a
695 : * leader which never waits for a worker that might not be started yet, or
696 : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
697 : * call this function at all.
698 : */
699 : void
700 162 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
701 : {
702 : int i;
703 :
704 : /* Skip this if we have no launched workers. */
705 162 : if (pcxt->nworkers_launched == 0)
706 0 : return;
707 :
708 : for (;;)
709 : {
710 : /*
711 : * This will process any parallel messages that are pending and it may
712 : * also throw an error propagated from a worker.
713 : */
714 14452892 : CHECK_FOR_INTERRUPTS();
715 :
716 29250232 : for (i = 0; i < pcxt->nworkers_launched; ++i)
717 : {
718 : BgwHandleStatus status;
719 : shm_mq *mq;
720 : int rc;
721 : pid_t pid;
722 :
723 14797340 : if (pcxt->known_attached_workers[i])
724 334218 : continue;
725 :
726 : /*
727 : * If error_mqh is NULL, then the worker has already exited
728 : * cleanly.
729 : */
730 14463122 : if (pcxt->worker[i].error_mqh == NULL)
731 : {
732 0 : pcxt->known_attached_workers[i] = true;
733 0 : ++pcxt->nknown_attached_workers;
734 0 : continue;
735 : }
736 :
737 14463122 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
738 14463122 : if (status == BGWH_STARTED)
739 : {
740 : /* Has the worker attached to the error queue? */
741 14463006 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
742 14463006 : if (shm_mq_get_sender(mq) != NULL)
743 : {
744 : /* Yes, so it is known to be attached. */
745 146 : pcxt->known_attached_workers[i] = true;
746 146 : ++pcxt->nknown_attached_workers;
747 : }
748 : }
749 116 : else if (status == BGWH_STOPPED)
750 : {
751 : /*
752 : * If the worker stopped without attaching to the error queue,
753 : * throw an error.
754 : */
755 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
756 0 : if (shm_mq_get_sender(mq) == NULL)
757 0 : ereport(ERROR,
758 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
759 : errmsg("parallel worker failed to initialize"),
760 : errhint("More details may be available in the server log.")));
761 :
762 0 : pcxt->known_attached_workers[i] = true;
763 0 : ++pcxt->nknown_attached_workers;
764 : }
765 : else
766 : {
767 : /*
768 : * Worker not yet started, so we must wait. The postmaster
769 : * will notify us if the worker's state changes. Our latch
770 : * might also get set for some other reason, but if so we'll
771 : * just end up waiting for the same worker again.
772 : */
773 116 : rc = WaitLatch(MyLatch,
774 : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
775 : -1, WAIT_EVENT_BGWORKER_STARTUP);
776 :
777 116 : if (rc & WL_LATCH_SET)
778 116 : ResetLatch(MyLatch);
779 : }
780 : }
781 :
782 : /* If all workers are known to have started, we're done. */
783 14452892 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
784 : {
785 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
786 162 : break;
787 : }
788 : }
789 : }
790 :
791 : /*
792 : * Wait for all workers to finish computing.
793 : *
794 : * Even if the parallel operation seems to have completed successfully, it's
795 : * important to call this function afterwards. We must not miss any errors
796 : * the workers may have thrown during the parallel operation, or any that they
797 : * may yet throw while shutting down.
798 : *
799 : * Also, we want to update our notion of XactLastRecEnd based on worker
800 : * feedback.
801 : */
802 : void
803 1460 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
804 : {
805 : for (;;)
806 1258 : {
807 2718 : bool anyone_alive = false;
808 2718 : int nfinished = 0;
809 : int i;
810 :
811 : /*
812 : * This will process any parallel messages that are pending, which may
813 : * change the outcome of the loop that follows. It may also throw an
814 : * error propagated from a worker.
815 : */
816 2718 : CHECK_FOR_INTERRUPTS();
817 :
818 9536 : for (i = 0; i < pcxt->nworkers_launched; ++i)
819 : {
820 : /*
821 : * If error_mqh is NULL, then the worker has already exited
822 : * cleanly. If we have received a message through error_mqh from
823 : * the worker, we know it started up cleanly, and therefore we're
824 : * certain to be notified when it exits.
825 : */
826 6852 : if (pcxt->worker[i].error_mqh == NULL)
827 5416 : ++nfinished;
828 1436 : else if (pcxt->known_attached_workers[i])
829 : {
830 34 : anyone_alive = true;
831 34 : break;
832 : }
833 : }
834 :
835 2718 : if (!anyone_alive)
836 : {
837 : /* If all workers are known to have finished, we're done. */
838 2684 : if (nfinished >= pcxt->nworkers_launched)
839 : {
840 : Assert(nfinished == pcxt->nworkers_launched);
841 1460 : break;
842 : }
843 :
844 : /*
845 : * We didn't detect any living workers, but not all workers are
846 : * known to have exited cleanly. Either not all workers have
847 : * launched yet, or maybe some of them failed to start or
848 : * terminated abnormally.
849 : */
850 4358 : for (i = 0; i < pcxt->nworkers_launched; ++i)
851 : {
852 : pid_t pid;
853 : shm_mq *mq;
854 :
855 : /*
856 : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
857 : * should just keep waiting. If it is BGWH_STOPPED, then
858 : * further investigation is needed.
859 : */
860 3134 : if (pcxt->worker[i].error_mqh == NULL ||
861 2804 : pcxt->worker[i].bgwhandle == NULL ||
862 1402 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
863 : &pid) != BGWH_STOPPED)
864 3134 : continue;
865 :
866 : /*
867 : * Check whether the worker ended up stopped without ever
868 : * attaching to the error queue. If so, the postmaster was
869 : * unable to fork the worker or it exited without initializing
870 : * properly. We must throw an error, since the caller may
871 : * have been expecting the worker to do some work before
872 : * exiting.
873 : */
874 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
875 0 : if (shm_mq_get_sender(mq) == NULL)
876 0 : ereport(ERROR,
877 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
878 : errmsg("parallel worker failed to initialize"),
879 : errhint("More details may be available in the server log.")));
880 :
881 : /*
882 : * The worker is stopped, but is attached to the error queue.
883 : * Unless there's a bug somewhere, this will only happen when
884 : * the worker writes messages and terminates after the
885 : * CHECK_FOR_INTERRUPTS() near the top of this function and
886 : * before the call to GetBackgroundWorkerPid(). In that case,
887 : * or latch should have been set as well and the right things
888 : * will happen on the next pass through the loop.
889 : */
890 : }
891 : }
892 :
893 1258 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
894 : WAIT_EVENT_PARALLEL_FINISH);
895 1258 : ResetLatch(MyLatch);
896 : }
897 :
898 1460 : if (pcxt->toc != NULL)
899 : {
900 : FixedParallelState *fps;
901 :
902 1460 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
903 1460 : if (fps->last_xlog_end > XactLastRecEnd)
904 18 : XactLastRecEnd = fps->last_xlog_end;
905 : }
906 1460 : }
907 :
908 : /*
909 : * Wait for all workers to exit.
910 : *
911 : * This function ensures that workers have been completely shutdown. The
912 : * difference between WaitForParallelWorkersToFinish and this function is
913 : * that the former just ensures that last message sent by a worker backend is
914 : * received by the leader backend whereas this ensures the complete shutdown.
915 : */
916 : static void
917 1212 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
918 : {
919 : int i;
920 :
921 : /* Wait until the workers actually die. */
922 4088 : for (i = 0; i < pcxt->nworkers_launched; ++i)
923 : {
924 : BgwHandleStatus status;
925 :
926 2876 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
927 0 : continue;
928 :
929 2876 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
930 :
931 : /*
932 : * If the postmaster kicked the bucket, we have no chance of cleaning
933 : * up safely -- we won't be able to tell when our workers are actually
934 : * dead. This doesn't necessitate a PANIC since they will all abort
935 : * eventually, but we can't safely continue this session.
936 : */
937 2876 : if (status == BGWH_POSTMASTER_DIED)
938 0 : ereport(FATAL,
939 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
940 : errmsg("postmaster exited during a parallel transaction")));
941 :
942 : /* Release memory. */
943 2876 : pfree(pcxt->worker[i].bgwhandle);
944 2876 : pcxt->worker[i].bgwhandle = NULL;
945 : }
946 1212 : }
947 :
948 : /*
949 : * Destroy a parallel context.
950 : *
951 : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
952 : * first, before calling this function. When this function is invoked, any
953 : * remaining workers are forcibly killed; the dynamic shared memory segment
954 : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
955 : */
956 : void
957 952 : DestroyParallelContext(ParallelContext *pcxt)
958 : {
959 : int i;
960 :
961 : /*
962 : * Be careful about order of operations here! We remove the parallel
963 : * context from the list before we do anything else; otherwise, if an
964 : * error occurs during a subsequent step, we might try to nuke it again
965 : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
966 : */
967 952 : dlist_delete(&pcxt->node);
968 :
969 : /* Kill each worker in turn, and forget their error queues. */
970 952 : if (pcxt->worker != NULL)
971 : {
972 3008 : for (i = 0; i < pcxt->nworkers_launched; ++i)
973 : {
974 2056 : if (pcxt->worker[i].error_mqh != NULL)
975 : {
976 12 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
977 :
978 12 : shm_mq_detach(pcxt->worker[i].error_mqh);
979 12 : pcxt->worker[i].error_mqh = NULL;
980 : }
981 : }
982 : }
983 :
984 : /*
985 : * If we have allocated a shared memory segment, detach it. This will
986 : * implicitly detach the error queues, and any other shared memory queues,
987 : * stored there.
988 : */
989 952 : if (pcxt->seg != NULL)
990 : {
991 952 : dsm_detach(pcxt->seg);
992 952 : pcxt->seg = NULL;
993 : }
994 :
995 : /*
996 : * If this parallel context is actually in backend-private memory rather
997 : * than shared memory, free that memory instead.
998 : */
999 952 : if (pcxt->private_memory != NULL)
1000 : {
1001 0 : pfree(pcxt->private_memory);
1002 0 : pcxt->private_memory = NULL;
1003 : }
1004 :
1005 : /*
1006 : * We can't finish transaction commit or abort until all of the workers
1007 : * have exited. This means, in particular, that we can't respond to
1008 : * interrupts at this stage.
1009 : */
1010 952 : HOLD_INTERRUPTS();
1011 952 : WaitForParallelWorkersToExit(pcxt);
1012 952 : RESUME_INTERRUPTS();
1013 :
1014 : /* Free the worker array itself. */
1015 952 : if (pcxt->worker != NULL)
1016 : {
1017 952 : pfree(pcxt->worker);
1018 952 : pcxt->worker = NULL;
1019 : }
1020 :
1021 : /* Free memory. */
1022 952 : pfree(pcxt->library_name);
1023 952 : pfree(pcxt->function_name);
1024 952 : pfree(pcxt);
1025 952 : }
1026 :
1027 : /*
1028 : * Are there any parallel contexts currently active?
1029 : */
1030 : bool
1031 0 : ParallelContextActive(void)
1032 : {
1033 0 : return !dlist_is_empty(&pcxt_list);
1034 : }
1035 :
1036 : /*
1037 : * Handle receipt of an interrupt indicating a parallel worker message.
1038 : *
1039 : * Note: this is called within a signal handler! All we can do is set
1040 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1041 : * ProcessParallelMessages().
1042 : */
1043 : void
1044 3458 : HandleParallelMessageInterrupt(void)
1045 : {
1046 3458 : InterruptPending = true;
1047 3458 : ParallelMessagePending = true;
1048 3458 : SetLatch(MyLatch);
1049 3458 : }
1050 :
1051 : /*
1052 : * Process any queued protocol messages received from parallel workers.
1053 : */
1054 : void
1055 3340 : ProcessParallelMessages(void)
1056 : {
1057 : dlist_iter iter;
1058 : MemoryContext oldcontext;
1059 :
1060 : static MemoryContext hpm_context = NULL;
1061 :
1062 : /*
1063 : * This is invoked from ProcessInterrupts(), and since some of the
1064 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1065 : * for recursive calls if more signals are received while this runs. It's
1066 : * unclear that recursive entry would be safe, and it doesn't seem useful
1067 : * even if it is safe, so let's block interrupts until done.
1068 : */
1069 3340 : HOLD_INTERRUPTS();
1070 :
1071 : /*
1072 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1073 : * don't want to risk leaking data into long-lived contexts, so let's do
1074 : * our work here in a private context that we can reset on each use.
1075 : */
1076 3340 : if (hpm_context == NULL) /* first time through? */
1077 150 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1078 : "ProcessParallelMessages",
1079 : ALLOCSET_DEFAULT_SIZES);
1080 : else
1081 3190 : MemoryContextReset(hpm_context);
1082 :
1083 3340 : oldcontext = MemoryContextSwitchTo(hpm_context);
1084 :
1085 : /* OK to process messages. Reset the flag saying there are more to do. */
1086 3340 : ParallelMessagePending = false;
1087 :
1088 6790 : dlist_foreach(iter, &pcxt_list)
1089 : {
1090 : ParallelContext *pcxt;
1091 : int i;
1092 :
1093 3462 : pcxt = dlist_container(ParallelContext, node, iter.cur);
1094 3462 : if (pcxt->worker == NULL)
1095 0 : continue;
1096 :
1097 14166 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1098 : {
1099 : /*
1100 : * Read as many messages as we can from each worker, but stop when
1101 : * either (1) the worker's error queue goes away, which can happen
1102 : * if we receive a Terminate message from the worker; or (2) no
1103 : * more messages can be read from the worker without blocking.
1104 : */
1105 13580 : while (pcxt->worker[i].error_mqh != NULL)
1106 : {
1107 : shm_mq_result res;
1108 : Size nbytes;
1109 : void *data;
1110 :
1111 6130 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1112 : &data, true);
1113 6130 : if (res == SHM_MQ_WOULD_BLOCK)
1114 3254 : break;
1115 2876 : else if (res == SHM_MQ_SUCCESS)
1116 : {
1117 : StringInfoData msg;
1118 :
1119 2876 : initStringInfo(&msg);
1120 2876 : appendBinaryStringInfo(&msg, data, nbytes);
1121 2876 : ProcessParallelMessage(pcxt, i, &msg);
1122 2864 : pfree(msg.data);
1123 : }
1124 : else
1125 0 : ereport(ERROR,
1126 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127 : errmsg("lost connection to parallel worker")));
1128 : }
1129 : }
1130 : }
1131 :
1132 3328 : MemoryContextSwitchTo(oldcontext);
1133 :
1134 : /* Might as well clear the context on our way out */
1135 3328 : MemoryContextReset(hpm_context);
1136 :
1137 3328 : RESUME_INTERRUPTS();
1138 3328 : }
1139 :
1140 : /*
1141 : * Process a single protocol message received from a single parallel worker.
1142 : */
1143 : static void
1144 2876 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1145 : {
1146 : char msgtype;
1147 :
1148 2876 : if (pcxt->known_attached_workers != NULL &&
1149 2876 : !pcxt->known_attached_workers[i])
1150 : {
1151 2730 : pcxt->known_attached_workers[i] = true;
1152 2730 : pcxt->nknown_attached_workers++;
1153 : }
1154 :
1155 2876 : msgtype = pq_getmsgbyte(msg);
1156 :
1157 2876 : switch (msgtype)
1158 : {
1159 12 : case PqMsg_ErrorResponse:
1160 : case PqMsg_NoticeResponse:
1161 : {
1162 : ErrorData edata;
1163 : ErrorContextCallback *save_error_context_stack;
1164 :
1165 : /* Parse ErrorResponse or NoticeResponse. */
1166 12 : pq_parse_errornotice(msg, &edata);
1167 :
1168 : /* Death of a worker isn't enough justification for suicide. */
1169 12 : edata.elevel = Min(edata.elevel, ERROR);
1170 :
1171 : /*
1172 : * If desired, add a context line to show that this is a
1173 : * message propagated from a parallel worker. Otherwise, it
1174 : * can sometimes be confusing to understand what actually
1175 : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1176 : * because it causes test-result instability depending on
1177 : * whether a parallel worker is actually used or not.)
1178 : */
1179 12 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1180 : {
1181 12 : if (edata.context)
1182 6 : edata.context = psprintf("%s\n%s", edata.context,
1183 : _("parallel worker"));
1184 : else
1185 6 : edata.context = pstrdup(_("parallel worker"));
1186 : }
1187 :
1188 : /*
1189 : * Context beyond that should use the error context callbacks
1190 : * that were in effect when the ParallelContext was created,
1191 : * not the current ones.
1192 : */
1193 12 : save_error_context_stack = error_context_stack;
1194 12 : error_context_stack = pcxt->error_context_stack;
1195 :
1196 : /* Rethrow error or print notice. */
1197 12 : ThrowErrorData(&edata);
1198 :
1199 : /* Not an error, so restore previous context stack. */
1200 0 : error_context_stack = save_error_context_stack;
1201 :
1202 0 : break;
1203 : }
1204 :
1205 0 : case PqMsg_NotificationResponse:
1206 : {
1207 : /* Propagate NotifyResponse. */
1208 : int32 pid;
1209 : const char *channel;
1210 : const char *payload;
1211 :
1212 0 : pid = pq_getmsgint(msg, 4);
1213 0 : channel = pq_getmsgrawstring(msg);
1214 0 : payload = pq_getmsgrawstring(msg);
1215 0 : pq_endmessage(msg);
1216 :
1217 0 : NotifyMyFrontEnd(channel, payload, pid);
1218 :
1219 0 : break;
1220 : }
1221 :
1222 0 : case PqMsg_Progress:
1223 : {
1224 : /*
1225 : * Only incremental progress reporting is currently supported.
1226 : * However, it's possible to add more fields to the message to
1227 : * allow for handling of other backend progress APIs.
1228 : */
1229 0 : int index = pq_getmsgint(msg, 4);
1230 0 : int64 incr = pq_getmsgint64(msg);
1231 :
1232 0 : pq_getmsgend(msg);
1233 :
1234 0 : pgstat_progress_incr_param(index, incr);
1235 :
1236 0 : break;
1237 : }
1238 :
1239 2864 : case PqMsg_Terminate:
1240 : {
1241 2864 : shm_mq_detach(pcxt->worker[i].error_mqh);
1242 2864 : pcxt->worker[i].error_mqh = NULL;
1243 2864 : break;
1244 : }
1245 :
1246 0 : default:
1247 : {
1248 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1249 : msgtype, msg->len);
1250 : }
1251 : }
1252 2864 : }
1253 :
1254 : /*
1255 : * End-of-subtransaction cleanup for parallel contexts.
1256 : *
1257 : * Here we remove only parallel contexts initiated within the current
1258 : * subtransaction.
1259 : */
1260 : void
1261 20154 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1262 : {
1263 20160 : while (!dlist_is_empty(&pcxt_list))
1264 : {
1265 : ParallelContext *pcxt;
1266 :
1267 6 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1268 6 : if (pcxt->subid != mySubId)
1269 0 : break;
1270 6 : if (isCommit)
1271 0 : elog(WARNING, "leaked parallel context");
1272 6 : DestroyParallelContext(pcxt);
1273 : }
1274 20154 : }
1275 :
1276 : /*
1277 : * End-of-transaction cleanup for parallel contexts.
1278 : *
1279 : * We nuke all remaining parallel contexts.
1280 : */
1281 : void
1282 1054458 : AtEOXact_Parallel(bool isCommit)
1283 : {
1284 1054464 : while (!dlist_is_empty(&pcxt_list))
1285 : {
1286 : ParallelContext *pcxt;
1287 :
1288 6 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1289 6 : if (isCommit)
1290 0 : elog(WARNING, "leaked parallel context");
1291 6 : DestroyParallelContext(pcxt);
1292 : }
1293 1054458 : }
1294 :
1295 : /*
1296 : * Main entrypoint for parallel workers.
1297 : */
1298 : void
1299 2876 : ParallelWorkerMain(Datum main_arg)
1300 : {
1301 : dsm_segment *seg;
1302 : shm_toc *toc;
1303 : FixedParallelState *fps;
1304 : char *error_queue_space;
1305 : shm_mq *mq;
1306 : shm_mq_handle *mqh;
1307 : char *libraryspace;
1308 : char *entrypointstate;
1309 : char *library_name;
1310 : char *function_name;
1311 : parallel_worker_main_type entrypt;
1312 : char *gucspace;
1313 : char *combocidspace;
1314 : char *tsnapspace;
1315 : char *asnapspace;
1316 : char *tstatespace;
1317 : char *pendingsyncsspace;
1318 : char *reindexspace;
1319 : char *relmapperspace;
1320 : char *uncommittedenumsspace;
1321 : char *clientconninfospace;
1322 : char *session_dsm_handle_space;
1323 : Snapshot tsnapshot;
1324 : Snapshot asnapshot;
1325 :
1326 : /* Set flag to indicate that we're initializing a parallel worker. */
1327 2876 : InitializingParallelWorker = true;
1328 :
1329 : /* Establish signal handlers. */
1330 2876 : pqsignal(SIGTERM, die);
1331 2876 : BackgroundWorkerUnblockSignals();
1332 :
1333 : /* Determine and set our parallel worker number. */
1334 : Assert(ParallelWorkerNumber == -1);
1335 2876 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1336 :
1337 : /* Set up a memory context to work in, just for cleanliness. */
1338 2876 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1339 : "Parallel worker",
1340 : ALLOCSET_DEFAULT_SIZES);
1341 :
1342 : /*
1343 : * Attach to the dynamic shared memory segment for the parallel query, and
1344 : * find its table of contents.
1345 : *
1346 : * Note: at this point, we have not created any ResourceOwner in this
1347 : * process. This will result in our DSM mapping surviving until process
1348 : * exit, which is fine. If there were a ResourceOwner, it would acquire
1349 : * ownership of the mapping, but we have no need for that.
1350 : */
1351 2876 : seg = dsm_attach(DatumGetUInt32(main_arg));
1352 2876 : if (seg == NULL)
1353 0 : ereport(ERROR,
1354 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1355 : errmsg("could not map dynamic shared memory segment")));
1356 2876 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1357 2876 : if (toc == NULL)
1358 0 : ereport(ERROR,
1359 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1360 : errmsg("invalid magic number in dynamic shared memory segment")));
1361 :
1362 : /* Look up fixed parallel state. */
1363 2876 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1364 2876 : MyFixedParallelState = fps;
1365 :
1366 : /* Arrange to signal the leader if we exit. */
1367 2876 : ParallelLeaderPid = fps->parallel_leader_pid;
1368 2876 : ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1369 2876 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1370 :
1371 : /*
1372 : * Now we can find and attach to the error queue provided for us. That's
1373 : * good, because until we do that, any errors that happen here will not be
1374 : * reported back to the process that requested that this worker be
1375 : * launched.
1376 : */
1377 2876 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1378 2876 : mq = (shm_mq *) (error_queue_space +
1379 2876 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1380 2876 : shm_mq_set_sender(mq, MyProc);
1381 2876 : mqh = shm_mq_attach(mq, seg, NULL);
1382 2876 : pq_redirect_to_shm_mq(seg, mqh);
1383 2876 : pq_set_parallel_leader(fps->parallel_leader_pid,
1384 : fps->parallel_leader_proc_number);
1385 :
1386 : /*
1387 : * Hooray! Primary initialization is complete. Now, we need to set up our
1388 : * backend-local state to match the original backend.
1389 : */
1390 :
1391 : /*
1392 : * Join locking group. We must do this before anything that could try to
1393 : * acquire a heavyweight lock, because any heavyweight locks acquired to
1394 : * this point could block either directly against the parallel group
1395 : * leader or against some process which in turn waits for a lock that
1396 : * conflicts with the parallel group leader, causing an undetected
1397 : * deadlock. (If we can't join the lock group, the leader has gone away,
1398 : * so just exit quietly.)
1399 : */
1400 2876 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1401 : fps->parallel_leader_pid))
1402 0 : return;
1403 :
1404 : /*
1405 : * Restore transaction and statement start-time timestamps. This must
1406 : * happen before anything that would start a transaction, else asserts in
1407 : * xact.c will fire.
1408 : */
1409 2876 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1410 :
1411 : /*
1412 : * Identify the entry point to be called. In theory this could result in
1413 : * loading an additional library, though most likely the entry point is in
1414 : * the core backend or in a library we just loaded.
1415 : */
1416 2876 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1417 2876 : library_name = entrypointstate;
1418 2876 : function_name = entrypointstate + strlen(library_name) + 1;
1419 :
1420 2876 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
1421 :
1422 : /*
1423 : * Restore current session authorization and role id. No verification
1424 : * happens here, we just blindly adopt the leader's state. Note that this
1425 : * has to happen before InitPostgres, since InitializeSessionUserId will
1426 : * not set these variables.
1427 : */
1428 2876 : SetAuthenticatedUserId(fps->authenticated_user_id);
1429 2876 : SetSessionAuthorization(fps->session_user_id,
1430 2876 : fps->session_user_is_superuser);
1431 2876 : SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
1432 :
1433 : /*
1434 : * Restore database connection. We skip connection authorization checks,
1435 : * reasoning that (a) the leader checked these things when it started, and
1436 : * (b) we do not want parallel mode to cause these failures, because that
1437 : * would make use of parallel query plans not transparent to applications.
1438 : */
1439 2876 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1440 : fps->authenticated_user_id,
1441 : BGWORKER_BYPASS_ALLOWCONN |
1442 : BGWORKER_BYPASS_ROLELOGINCHECK);
1443 :
1444 : /*
1445 : * Set the client encoding to the database encoding, since that is what
1446 : * the leader will expect. (We're cheating a bit by not calling
1447 : * PrepareClientEncoding first. It's okay because this call will always
1448 : * result in installing a no-op conversion. No error should be possible,
1449 : * but check anyway.)
1450 : */
1451 2876 : if (SetClientEncoding(GetDatabaseEncoding()) < 0)
1452 0 : elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
1453 :
1454 : /*
1455 : * Load libraries that were loaded by original backend. We want to do
1456 : * this before restoring GUCs, because the libraries might define custom
1457 : * variables.
1458 : */
1459 2876 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1460 2876 : StartTransactionCommand();
1461 2876 : RestoreLibraryState(libraryspace);
1462 2876 : CommitTransactionCommand();
1463 :
1464 : /* Crank up a transaction state appropriate to a parallel worker. */
1465 2876 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1466 2876 : StartParallelWorkerTransaction(tstatespace);
1467 :
1468 : /*
1469 : * Restore state that affects catalog access. Ideally we'd do this even
1470 : * before calling InitPostgres, but that has order-of-initialization
1471 : * problems, and also the relmapper would get confused during the
1472 : * CommitTransactionCommand call above.
1473 : */
1474 2876 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1475 : false);
1476 2876 : RestorePendingSyncs(pendingsyncsspace);
1477 2876 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1478 2876 : RestoreRelationMap(relmapperspace);
1479 2876 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1480 2876 : RestoreReindexState(reindexspace);
1481 2876 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1482 2876 : RestoreComboCIDState(combocidspace);
1483 :
1484 : /* Attach to the per-session DSM segment and contained objects. */
1485 : session_dsm_handle_space =
1486 2876 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1487 2876 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1488 :
1489 : /*
1490 : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1491 : * the leader has serialized the transaction snapshot and we must restore
1492 : * it. At lower isolation levels, there is no transaction-lifetime
1493 : * snapshot, but we need TransactionXmin to get set to a value which is
1494 : * less than or equal to the xmin of every snapshot that will be used by
1495 : * this worker. The easiest way to accomplish that is to install the
1496 : * active snapshot as the transaction snapshot. Code running in this
1497 : * parallel worker might take new snapshots via GetTransactionSnapshot()
1498 : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1499 : * snapshot older than the active snapshot.
1500 : */
1501 2876 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1502 2876 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1503 2876 : asnapshot = RestoreSnapshot(asnapspace);
1504 2876 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1505 2876 : RestoreTransactionSnapshot(tsnapshot,
1506 2876 : fps->parallel_leader_pgproc);
1507 2876 : PushActiveSnapshot(asnapshot);
1508 :
1509 : /*
1510 : * We've changed which tuples we can see, and must therefore invalidate
1511 : * system caches.
1512 : */
1513 2876 : InvalidateSystemCaches();
1514 :
1515 : /*
1516 : * Restore GUC values from launching backend. We can't do this earlier,
1517 : * because GUC check hooks that do catalog lookups need to see the same
1518 : * database state as the leader. Also, the check hooks for
1519 : * session_authorization and role assume we already set the correct role
1520 : * OIDs.
1521 : */
1522 2876 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1523 2876 : RestoreGUCState(gucspace);
1524 :
1525 : /*
1526 : * Restore current user ID and security context. No verification happens
1527 : * here, we just blindly adopt the leader's state. We can't do this till
1528 : * after restoring GUCs, else we'll get complaints about restoring
1529 : * session_authorization and role. (In effect, we're assuming that all
1530 : * the restored values are okay to set, even if we are now inside a
1531 : * restricted context.)
1532 : */
1533 2876 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1534 :
1535 : /* Restore temp-namespace state to ensure search path matches leader's. */
1536 2876 : SetTempNamespaceState(fps->temp_namespace_id,
1537 : fps->temp_toast_namespace_id);
1538 :
1539 : /* Restore uncommitted enums. */
1540 2876 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1541 : false);
1542 2876 : RestoreUncommittedEnums(uncommittedenumsspace);
1543 :
1544 : /* Restore the ClientConnectionInfo. */
1545 2876 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1546 : false);
1547 2876 : RestoreClientConnectionInfo(clientconninfospace);
1548 :
1549 : /*
1550 : * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1551 : * ensure that auth_method is actually valid, aka authn_id is not NULL.
1552 : */
1553 2876 : if (MyClientConnectionInfo.authn_id)
1554 4 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1555 : hba_authname(MyClientConnectionInfo.auth_method));
1556 :
1557 : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
1558 2876 : AttachSerializableXact(fps->serializable_xact_handle);
1559 :
1560 : /*
1561 : * We've initialized all of our state now; nothing should change
1562 : * hereafter.
1563 : */
1564 2876 : InitializingParallelWorker = false;
1565 2876 : EnterParallelMode();
1566 :
1567 : /*
1568 : * Time to do the real work: invoke the caller-supplied code.
1569 : */
1570 2876 : entrypt(seg, toc);
1571 :
1572 : /* Must exit parallel mode to pop active snapshot. */
1573 2864 : ExitParallelMode();
1574 :
1575 : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1576 2864 : PopActiveSnapshot();
1577 :
1578 : /* Shut down the parallel-worker transaction. */
1579 2864 : EndParallelWorkerTransaction();
1580 :
1581 : /* Detach from the per-session DSM segment. */
1582 2864 : DetachSession();
1583 :
1584 : /* Report success. */
1585 2864 : pq_putmessage(PqMsg_Terminate, NULL, 0);
1586 : }
1587 :
1588 : /*
1589 : * Update shared memory with the ending location of the last WAL record we
1590 : * wrote, if it's greater than the value already stored there.
1591 : */
1592 : void
1593 2864 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1594 : {
1595 2864 : FixedParallelState *fps = MyFixedParallelState;
1596 :
1597 : Assert(fps != NULL);
1598 2864 : SpinLockAcquire(&fps->mutex);
1599 2864 : if (fps->last_xlog_end < last_xlog_end)
1600 184 : fps->last_xlog_end = last_xlog_end;
1601 2864 : SpinLockRelease(&fps->mutex);
1602 2864 : }
1603 :
1604 : /*
1605 : * Make sure the leader tries to read from our error queue one more time.
1606 : * This guards against the case where we exit uncleanly without sending an
1607 : * ErrorResponse to the leader, for example because some code calls proc_exit
1608 : * directly.
1609 : *
1610 : * Also explicitly detach from dsm segment so that subsystems using
1611 : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1612 : * shut down as part of a before_shmem_exit() hook.
1613 : *
1614 : * One might think this could instead be solved by carefully ordering the
1615 : * attaching to dsm segments, so that the pgstats segments get detached from
1616 : * later than the parallel query one. That turns out to not work because the
1617 : * stats hash might need to grow which can cause new segments to be allocated,
1618 : * which then will be detached from earlier.
1619 : */
1620 : static void
1621 2876 : ParallelWorkerShutdown(int code, Datum arg)
1622 : {
1623 2876 : SendProcSignal(ParallelLeaderPid,
1624 : PROCSIG_PARALLEL_MESSAGE,
1625 : ParallelLeaderProcNumber);
1626 :
1627 2876 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
1628 2876 : }
1629 :
1630 : /*
1631 : * Look up (and possibly load) a parallel worker entry point function.
1632 : *
1633 : * For functions contained in the core code, we use library name "postgres"
1634 : * and consult the InternalParallelWorkers array. External functions are
1635 : * looked up, and loaded if necessary, using load_external_function().
1636 : *
1637 : * The point of this is to pass function names as strings across process
1638 : * boundaries. We can't pass actual function addresses because of the
1639 : * possibility that the function has been loaded at a different address
1640 : * in a different process. This is obviously a hazard for functions in
1641 : * loadable libraries, but it can happen even for functions in the core code
1642 : * on platforms using EXEC_BACKEND (e.g., Windows).
1643 : *
1644 : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1645 : * in favor of applying load_external_function() for core functions too;
1646 : * but that raises portability issues that are not worth addressing now.
1647 : */
1648 : static parallel_worker_main_type
1649 2876 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1650 : {
1651 : /*
1652 : * If the function is to be loaded from postgres itself, search the
1653 : * InternalParallelWorkers array.
1654 : */
1655 2876 : if (strcmp(libraryname, "postgres") == 0)
1656 : {
1657 : int i;
1658 :
1659 3246 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1660 : {
1661 3246 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1662 2876 : return InternalParallelWorkers[i].fn_addr;
1663 : }
1664 :
1665 : /* We can only reach this by programming error. */
1666 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1667 : }
1668 :
1669 : /* Otherwise load from external library. */
1670 0 : return (parallel_worker_main_type)
1671 0 : load_external_function(libraryname, funcname, true, NULL);
1672 : }
|