Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : * Infrastructure for launching parallel workers
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/parallel.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/brin.h"
18 : #include "access/gin.h"
19 : #include "access/nbtree.h"
20 : #include "access/parallel.h"
21 : #include "access/session.h"
22 : #include "access/xact.h"
23 : #include "access/xlog.h"
24 : #include "catalog/index.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/pg_enum.h"
27 : #include "catalog/storage.h"
28 : #include "commands/async.h"
29 : #include "commands/vacuum.h"
30 : #include "executor/execParallel.h"
31 : #include "libpq/libpq.h"
32 : #include "libpq/pqformat.h"
33 : #include "libpq/pqmq.h"
34 : #include "miscadmin.h"
35 : #include "optimizer/optimizer.h"
36 : #include "pgstat.h"
37 : #include "storage/ipc.h"
38 : #include "storage/predicate.h"
39 : #include "storage/proc.h"
40 : #include "storage/spin.h"
41 : #include "tcop/tcopprot.h"
42 : #include "utils/combocid.h"
43 : #include "utils/guc.h"
44 : #include "utils/inval.h"
45 : #include "utils/memutils.h"
46 : #include "utils/relmapper.h"
47 : #include "utils/snapmgr.h"
48 : #include "utils/wait_event.h"
49 :
50 : /*
51 : * We don't want to waste a lot of memory on an error queue which, most of
52 : * the time, will process only a handful of small messages. However, it is
53 : * desirable to make it large enough that a typical ErrorResponse can be sent
54 : * without blocking. That way, a worker that errors out can write the whole
55 : * message into the queue and terminate without waiting for the user backend.
56 : */
57 : #define PARALLEL_ERROR_QUEUE_SIZE 16384
58 :
59 : /* Magic number for parallel context TOC. */
60 : #define PARALLEL_MAGIC 0x50477c7c
61 :
62 : /*
63 : * Magic numbers for per-context parallel state sharing. Higher-level code
64 : * should use smaller values, leaving these very large ones for use by this
65 : * module.
66 : */
67 : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
68 : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
69 : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
70 : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
71 : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
72 : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
73 : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
74 : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
75 : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
76 : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
77 : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
78 : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
79 : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
80 : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
81 : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
82 :
83 : /* Fixed-size parallel state. */
84 : typedef struct FixedParallelState
85 : {
86 : /* Fixed-size state that workers must restore. */
87 : Oid database_id;
88 : Oid authenticated_user_id;
89 : Oid session_user_id;
90 : Oid outer_user_id;
91 : Oid current_user_id;
92 : Oid temp_namespace_id;
93 : Oid temp_toast_namespace_id;
94 : int sec_context;
95 : bool session_user_is_superuser;
96 : bool role_is_superuser;
97 : PGPROC *parallel_leader_pgproc;
98 : pid_t parallel_leader_pid;
99 : ProcNumber parallel_leader_proc_number;
100 : TimestampTz xact_ts;
101 : TimestampTz stmt_ts;
102 : SerializableXactHandle serializable_xact_handle;
103 :
104 : /* Mutex protects remaining fields. */
105 : slock_t mutex;
106 :
107 : /* Maximum XactLastRecEnd of any worker. */
108 : XLogRecPtr last_xlog_end;
109 : } FixedParallelState;
110 :
111 : /*
112 : * Our parallel worker number. We initialize this to -1, meaning that we are
113 : * not a parallel worker. In parallel workers, it will be set to a value >= 0
114 : * and < the number of workers before any user code is invoked; each parallel
115 : * worker will get a different parallel worker number.
116 : */
117 : int ParallelWorkerNumber = -1;
118 :
119 : /* Is there a parallel message pending which we need to receive? */
120 : volatile sig_atomic_t ParallelMessagePending = false;
121 :
122 : /* Are we initializing a parallel worker? */
123 : bool InitializingParallelWorker = false;
124 :
125 : /* Pointer to our fixed parallel state. */
126 : static FixedParallelState *MyFixedParallelState;
127 :
128 : /* List of active parallel contexts. */
129 : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
130 :
131 : /* Backend-local copy of data from FixedParallelState. */
132 : static pid_t ParallelLeaderPid;
133 :
134 : /*
135 : * List of internal parallel worker entry points. We need this for
136 : * reasons explained in LookupParallelWorkerFunction(), below.
137 : */
138 : static const struct
139 : {
140 : const char *fn_name;
141 : parallel_worker_main_type fn_addr;
142 : } InternalParallelWorkers[] =
143 :
144 : {
145 : {
146 : "ParallelQueryMain", ParallelQueryMain
147 : },
148 : {
149 : "_bt_parallel_build_main", _bt_parallel_build_main
150 : },
151 : {
152 : "_brin_parallel_build_main", _brin_parallel_build_main
153 : },
154 : {
155 : "_gin_parallel_build_main", _gin_parallel_build_main
156 : },
157 : {
158 : "parallel_vacuum_main", parallel_vacuum_main
159 : }
160 : };
161 :
162 : /* Private functions. */
163 : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
164 : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
165 : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
166 : static void ParallelWorkerShutdown(int code, Datum arg);
167 :
168 :
169 : /*
170 : * Establish a new parallel context. This should be done after entering
171 : * parallel mode, and (unless there is an error) the context should be
172 : * destroyed before exiting the current subtransaction.
173 : */
174 : ParallelContext *
175 498 : CreateParallelContext(const char *library_name, const char *function_name,
176 : int nworkers)
177 : {
178 : MemoryContext oldcontext;
179 : ParallelContext *pcxt;
180 :
181 : /* It is unsafe to create a parallel context if not in parallel mode. */
182 : Assert(IsInParallelMode());
183 :
184 : /* Number of workers should be non-negative. */
185 : Assert(nworkers >= 0);
186 :
187 : /* We might be running in a short-lived memory context. */
188 498 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
189 :
190 : /* Initialize a new ParallelContext. */
191 498 : pcxt = palloc0_object(ParallelContext);
192 498 : pcxt->subid = GetCurrentSubTransactionId();
193 498 : pcxt->nworkers = nworkers;
194 498 : pcxt->nworkers_to_launch = nworkers;
195 498 : pcxt->library_name = pstrdup(library_name);
196 498 : pcxt->function_name = pstrdup(function_name);
197 498 : pcxt->error_context_stack = error_context_stack;
198 498 : shm_toc_initialize_estimator(&pcxt->estimator);
199 498 : dlist_push_head(&pcxt_list, &pcxt->node);
200 :
201 : /* Restore previous memory context. */
202 498 : MemoryContextSwitchTo(oldcontext);
203 :
204 498 : return pcxt;
205 : }
206 :
207 : /*
208 : * Establish the dynamic shared memory segment for a parallel context and
209 : * copy state and other bookkeeping information that will be needed by
210 : * parallel workers into it.
211 : */
212 : void
213 498 : InitializeParallelDSM(ParallelContext *pcxt)
214 : {
215 : MemoryContext oldcontext;
216 498 : Size library_len = 0;
217 498 : Size guc_len = 0;
218 498 : Size combocidlen = 0;
219 498 : Size tsnaplen = 0;
220 498 : Size asnaplen = 0;
221 498 : Size tstatelen = 0;
222 498 : Size pendingsyncslen = 0;
223 498 : Size reindexlen = 0;
224 498 : Size relmapperlen = 0;
225 498 : Size uncommittedenumslen = 0;
226 498 : Size clientconninfolen = 0;
227 498 : Size segsize = 0;
228 : int i;
229 : FixedParallelState *fps;
230 498 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
231 498 : Snapshot transaction_snapshot = GetTransactionSnapshot();
232 498 : Snapshot active_snapshot = GetActiveSnapshot();
233 :
234 : /* We might be running in a very short-lived memory context. */
235 498 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
236 :
237 : /* Allow space to store the fixed-size parallel state. */
238 498 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
239 498 : shm_toc_estimate_keys(&pcxt->estimator, 1);
240 :
241 : /*
242 : * If we manage to reach here while non-interruptible, it's unsafe to
243 : * launch any workers: we would fail to process interrupts sent by them.
244 : * We can deal with that edge case by pretending no workers were
245 : * requested.
246 : */
247 498 : if (!INTERRUPTS_CAN_BE_PROCESSED())
248 0 : pcxt->nworkers = 0;
249 :
250 : /*
251 : * Normally, the user will have requested at least one worker process, but
252 : * if by chance they have not, we can skip a bunch of things here.
253 : */
254 498 : if (pcxt->nworkers > 0)
255 : {
256 : /* Get (or create) the per-session DSM segment's handle. */
257 498 : session_dsm_handle = GetSessionDsmHandle();
258 :
259 : /*
260 : * If we weren't able to create a per-session DSM segment, then we can
261 : * continue but we can't safely launch any workers because their
262 : * record typmods would be incompatible so they couldn't exchange
263 : * tuples.
264 : */
265 498 : if (session_dsm_handle == DSM_HANDLE_INVALID)
266 0 : pcxt->nworkers = 0;
267 : }
268 :
269 498 : if (pcxt->nworkers > 0)
270 : {
271 : StaticAssertDecl(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
272 : PARALLEL_ERROR_QUEUE_SIZE,
273 : "parallel error queue size not buffer-aligned");
274 :
275 : /* Estimate space for various kinds of state sharing. */
276 498 : library_len = EstimateLibraryStateSpace();
277 498 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
278 498 : guc_len = EstimateGUCStateSpace();
279 498 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
280 498 : combocidlen = EstimateComboCIDStateSpace();
281 498 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
282 498 : if (IsolationUsesXactSnapshot())
283 : {
284 11 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
285 11 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
286 : }
287 498 : asnaplen = EstimateSnapshotSpace(active_snapshot);
288 498 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
289 498 : tstatelen = EstimateTransactionStateSpace();
290 498 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
291 498 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
292 498 : pendingsyncslen = EstimatePendingSyncsSpace();
293 498 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
294 498 : reindexlen = EstimateReindexStateSpace();
295 498 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
296 498 : relmapperlen = EstimateRelationMapSpace();
297 498 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
298 498 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
299 498 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
300 498 : clientconninfolen = EstimateClientConnectionInfoSpace();
301 498 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
302 : /* If you add more chunks here, you probably need to add keys. */
303 498 : shm_toc_estimate_keys(&pcxt->estimator, 12);
304 :
305 : /* Estimate space need for error queues. */
306 498 : shm_toc_estimate_chunk(&pcxt->estimator,
307 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
308 : pcxt->nworkers));
309 498 : shm_toc_estimate_keys(&pcxt->estimator, 1);
310 :
311 : /* Estimate how much we'll need for the entrypoint info. */
312 498 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
313 : strlen(pcxt->function_name) + 2);
314 498 : shm_toc_estimate_keys(&pcxt->estimator, 1);
315 : }
316 :
317 : /*
318 : * Create DSM and initialize with new table of contents. But if the user
319 : * didn't request any workers, then don't bother creating a dynamic shared
320 : * memory segment; instead, just use backend-private memory.
321 : *
322 : * Also, if we can't create a dynamic shared memory segment because the
323 : * maximum number of segments have already been created, then fall back to
324 : * backend-private memory, and plan not to use any workers. We hope this
325 : * won't happen very often, but it's better to abandon the use of
326 : * parallelism than to fail outright.
327 : */
328 498 : segsize = shm_toc_estimate(&pcxt->estimator);
329 498 : if (pcxt->nworkers > 0)
330 498 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
331 498 : if (pcxt->seg != NULL)
332 498 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
333 : dsm_segment_address(pcxt->seg),
334 : segsize);
335 : else
336 : {
337 0 : pcxt->nworkers = 0;
338 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
339 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
340 : segsize);
341 : }
342 :
343 : /* Initialize fixed-size state in shared memory. */
344 : fps = (FixedParallelState *)
345 498 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
346 498 : fps->database_id = MyDatabaseId;
347 498 : fps->authenticated_user_id = GetAuthenticatedUserId();
348 498 : fps->session_user_id = GetSessionUserId();
349 498 : fps->outer_user_id = GetCurrentRoleId();
350 498 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
351 498 : fps->session_user_is_superuser = GetSessionUserIsSuperuser();
352 498 : fps->role_is_superuser = current_role_is_superuser;
353 498 : GetTempNamespaceState(&fps->temp_namespace_id,
354 : &fps->temp_toast_namespace_id);
355 498 : fps->parallel_leader_pgproc = MyProc;
356 498 : fps->parallel_leader_pid = MyProcPid;
357 498 : fps->parallel_leader_proc_number = MyProcNumber;
358 498 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
359 498 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
360 498 : fps->serializable_xact_handle = ShareSerializableXact();
361 498 : SpinLockInit(&fps->mutex);
362 498 : fps->last_xlog_end = InvalidXLogRecPtr;
363 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
364 :
365 : /* We can skip the rest of this if we're not budgeting for any workers. */
366 498 : if (pcxt->nworkers > 0)
367 : {
368 : char *libraryspace;
369 : char *gucspace;
370 : char *combocidspace;
371 : char *tsnapspace;
372 : char *asnapspace;
373 : char *tstatespace;
374 : char *pendingsyncsspace;
375 : char *reindexspace;
376 : char *relmapperspace;
377 : char *error_queue_space;
378 : char *session_dsm_handle_space;
379 : char *entrypointstate;
380 : char *uncommittedenumsspace;
381 : char *clientconninfospace;
382 : Size lnamelen;
383 :
384 : /* Serialize shared libraries we have loaded. */
385 498 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
386 498 : SerializeLibraryState(library_len, libraryspace);
387 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
388 :
389 : /* Serialize GUC settings. */
390 498 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
391 498 : SerializeGUCState(guc_len, gucspace);
392 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
393 :
394 : /* Serialize combo CID state. */
395 498 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
396 498 : SerializeComboCIDState(combocidlen, combocidspace);
397 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
398 :
399 : /*
400 : * Serialize the transaction snapshot if the transaction isolation
401 : * level uses a transaction snapshot.
402 : */
403 498 : if (IsolationUsesXactSnapshot())
404 : {
405 11 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
406 11 : SerializeSnapshot(transaction_snapshot, tsnapspace);
407 11 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
408 : tsnapspace);
409 : }
410 :
411 : /* Serialize the active snapshot. */
412 498 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
413 498 : SerializeSnapshot(active_snapshot, asnapspace);
414 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
415 :
416 : /* Provide the handle for per-session segment. */
417 498 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
418 : sizeof(dsm_handle));
419 498 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
420 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
421 : session_dsm_handle_space);
422 :
423 : /* Serialize transaction state. */
424 498 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
425 498 : SerializeTransactionState(tstatelen, tstatespace);
426 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
427 :
428 : /* Serialize pending syncs. */
429 498 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
430 498 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
431 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
432 : pendingsyncsspace);
433 :
434 : /* Serialize reindex state. */
435 498 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
436 498 : SerializeReindexState(reindexlen, reindexspace);
437 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
438 :
439 : /* Serialize relmapper state. */
440 498 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
441 498 : SerializeRelationMap(relmapperlen, relmapperspace);
442 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
443 : relmapperspace);
444 :
445 : /* Serialize uncommitted enum state. */
446 498 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
447 : uncommittedenumslen);
448 498 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
449 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
450 : uncommittedenumsspace);
451 :
452 : /* Serialize our ClientConnectionInfo. */
453 498 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
454 498 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
455 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
456 : clientconninfospace);
457 :
458 : /* Allocate space for worker information. */
459 498 : pcxt->worker = palloc0_array(ParallelWorkerInfo, pcxt->nworkers);
460 :
461 : /*
462 : * Establish error queues in dynamic shared memory.
463 : *
464 : * These queues should be used only for transmitting ErrorResponse,
465 : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
466 : * should be transmitted via separate (possibly larger?) queues.
467 : */
468 : error_queue_space =
469 498 : shm_toc_allocate(pcxt->toc,
470 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
471 498 : pcxt->nworkers));
472 1611 : for (i = 0; i < pcxt->nworkers; ++i)
473 : {
474 : char *start;
475 : shm_mq *mq;
476 :
477 1113 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
478 1113 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
479 1113 : shm_mq_set_receiver(mq, MyProc);
480 1113 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
481 : }
482 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
483 :
484 : /*
485 : * Serialize entrypoint information. It's unsafe to pass function
486 : * pointers across processes, as the function pointer may be different
487 : * in each process in EXEC_BACKEND builds, so we always pass library
488 : * and function name. (We use library name "postgres" for functions
489 : * in the core backend.)
490 : */
491 498 : lnamelen = strlen(pcxt->library_name);
492 498 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
493 498 : strlen(pcxt->function_name) + 2);
494 498 : strcpy(entrypointstate, pcxt->library_name);
495 498 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
496 498 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
497 : }
498 :
499 : /* Update nworkers_to_launch, in case we changed nworkers above. */
500 498 : pcxt->nworkers_to_launch = pcxt->nworkers;
501 :
502 : /* Restore previous memory context. */
503 498 : MemoryContextSwitchTo(oldcontext);
504 498 : }
505 :
506 : /*
507 : * Reinitialize the dynamic shared memory segment for a parallel context such
508 : * that we could launch workers for it again.
509 : */
510 : void
511 143 : ReinitializeParallelDSM(ParallelContext *pcxt)
512 : {
513 : MemoryContext oldcontext;
514 : FixedParallelState *fps;
515 :
516 : /* We might be running in a very short-lived memory context. */
517 143 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
518 :
519 : /* Wait for any old workers to exit. */
520 143 : if (pcxt->nworkers_launched > 0)
521 : {
522 143 : WaitForParallelWorkersToFinish(pcxt);
523 143 : WaitForParallelWorkersToExit(pcxt);
524 143 : pcxt->nworkers_launched = 0;
525 143 : if (pcxt->known_attached_workers)
526 : {
527 143 : pfree(pcxt->known_attached_workers);
528 143 : pcxt->known_attached_workers = NULL;
529 143 : pcxt->nknown_attached_workers = 0;
530 : }
531 : }
532 :
533 : /* Reset a few bits of fixed parallel state to a clean state. */
534 143 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
535 143 : fps->last_xlog_end = InvalidXLogRecPtr;
536 :
537 : /* Recreate error queues (if they exist). */
538 143 : if (pcxt->nworkers > 0)
539 : {
540 : char *error_queue_space;
541 : int i;
542 :
543 : error_queue_space =
544 143 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
545 570 : for (i = 0; i < pcxt->nworkers; ++i)
546 : {
547 : char *start;
548 : shm_mq *mq;
549 :
550 427 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
551 427 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
552 427 : shm_mq_set_receiver(mq, MyProc);
553 427 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
554 : }
555 : }
556 :
557 : /* Restore previous memory context. */
558 143 : MemoryContextSwitchTo(oldcontext);
559 143 : }
560 :
561 : /*
562 : * Reinitialize parallel workers for a parallel context such that we could
563 : * launch a different number of workers. This is required for cases where
564 : * we need to reuse the same DSM segment, but the number of workers can
565 : * vary from run-to-run.
566 : */
567 : void
568 31 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
569 : {
570 : /*
571 : * The number of workers that need to be launched must be less than the
572 : * number of workers with which the parallel context is initialized. But
573 : * the caller might not know that InitializeParallelDSM reduced nworkers,
574 : * so just silently trim the request.
575 : */
576 31 : pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
577 31 : }
578 :
579 : /*
580 : * Launch parallel workers.
581 : */
582 : void
583 641 : LaunchParallelWorkers(ParallelContext *pcxt)
584 : {
585 : MemoryContext oldcontext;
586 : BackgroundWorker worker;
587 : int i;
588 641 : bool any_registrations_failed = false;
589 :
590 : /* Skip this if we have no workers. */
591 641 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
592 0 : return;
593 :
594 : /* We need to be a lock group leader. */
595 641 : BecomeLockGroupLeader();
596 :
597 : /* If we do have workers, we'd better have a DSM segment. */
598 : Assert(pcxt->seg != NULL);
599 :
600 : /* We might be running in a short-lived memory context. */
601 641 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
602 :
603 : /* Configure a worker. */
604 641 : memset(&worker, 0, sizeof(worker));
605 641 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
606 : MyProcPid);
607 641 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
608 641 : worker.bgw_flags =
609 : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
610 : | BGWORKER_CLASS_PARALLEL;
611 641 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
612 641 : worker.bgw_restart_time = BGW_NEVER_RESTART;
613 641 : sprintf(worker.bgw_library_name, "postgres");
614 641 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
615 641 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
616 641 : worker.bgw_notify_pid = MyProcPid;
617 :
618 : /*
619 : * Start workers.
620 : *
621 : * The caller must be able to tolerate ending up with fewer workers than
622 : * expected, so there is no need to throw an error here if registration
623 : * fails. It wouldn't help much anyway, because registering the worker in
624 : * no way guarantees that it will start up and initialize successfully.
625 : */
626 2179 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
627 : {
628 1538 : memcpy(worker.bgw_extra, &i, sizeof(int));
629 3047 : if (!any_registrations_failed &&
630 1509 : RegisterDynamicBackgroundWorker(&worker,
631 1509 : &pcxt->worker[i].bgwhandle))
632 : {
633 1493 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
634 1493 : pcxt->worker[i].bgwhandle);
635 1493 : pcxt->nworkers_launched++;
636 : }
637 : else
638 : {
639 : /*
640 : * If we weren't able to register the worker, then we've bumped up
641 : * against the max_worker_processes limit, and future
642 : * registrations will probably fail too, so arrange to skip them.
643 : * But we still have to execute this code for the remaining slots
644 : * to make sure that we forget about the error queues we budgeted
645 : * for those workers. Otherwise, we'll wait for them to start,
646 : * but they never will.
647 : */
648 45 : any_registrations_failed = true;
649 45 : pcxt->worker[i].bgwhandle = NULL;
650 45 : shm_mq_detach(pcxt->worker[i].error_mqh);
651 45 : pcxt->worker[i].error_mqh = NULL;
652 : }
653 : }
654 :
655 : /*
656 : * Now that nworkers_launched has taken its final value, we can initialize
657 : * known_attached_workers.
658 : */
659 641 : if (pcxt->nworkers_launched > 0)
660 : {
661 631 : pcxt->known_attached_workers = palloc0_array(bool, pcxt->nworkers_launched);
662 631 : pcxt->nknown_attached_workers = 0;
663 : }
664 :
665 : /* Restore previous memory context. */
666 641 : MemoryContextSwitchTo(oldcontext);
667 : }
668 :
669 : /*
670 : * Wait for all workers to attach to their error queues, and throw an error if
671 : * any worker fails to do this.
672 : *
673 : * Callers can assume that if this function returns successfully, then the
674 : * number of workers given by pcxt->nworkers_launched have initialized and
675 : * attached to their error queues. Whether or not these workers are guaranteed
676 : * to still be running depends on what code the caller asked them to run;
677 : * this function does not guarantee that they have not exited. However, it
678 : * does guarantee that any workers which exited must have done so cleanly and
679 : * after successfully performing the work with which they were tasked.
680 : *
681 : * If this function is not called, then some of the workers that were launched
682 : * may not have been started due to a fork() failure, or may have exited during
683 : * early startup prior to attaching to the error queue, so nworkers_launched
684 : * cannot be viewed as completely reliable. It will never be less than the
685 : * number of workers which actually started, but it might be more. Any workers
686 : * that failed to start will still be discovered by
687 : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
688 : * provided that function is eventually reached.
689 : *
690 : * In general, the leader process should do as much work as possible before
691 : * calling this function. fork() failures and other early-startup failures
692 : * are very uncommon, and having the leader sit idle when it could be doing
693 : * useful work is undesirable. However, if the leader needs to wait for
694 : * all of its workers or for a specific worker, it may want to call this
695 : * function before doing so. If not, it must make some other provision for
696 : * the failure-to-start case, lest it wait forever. On the other hand, a
697 : * leader which never waits for a worker that might not be started yet, or
698 : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
699 : * call this function at all.
700 : */
701 : void
702 97 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
703 : {
704 : int i;
705 :
706 : /* Skip this if we have no launched workers. */
707 97 : if (pcxt->nworkers_launched == 0)
708 0 : return;
709 :
710 : for (;;)
711 : {
712 : /*
713 : * This will process any parallel messages that are pending and it may
714 : * also throw an error propagated from a worker.
715 : */
716 7372161 : CHECK_FOR_INTERRUPTS();
717 :
718 15327293 : for (i = 0; i < pcxt->nworkers_launched; ++i)
719 : {
720 : BgwHandleStatus status;
721 : shm_mq *mq;
722 : int rc;
723 : pid_t pid;
724 :
725 7955132 : if (pcxt->known_attached_workers[i])
726 282961 : continue;
727 :
728 : /*
729 : * If error_mqh is NULL, then the worker has already exited
730 : * cleanly.
731 : */
732 7672171 : if (pcxt->worker[i].error_mqh == NULL)
733 : {
734 0 : pcxt->known_attached_workers[i] = true;
735 0 : ++pcxt->nknown_attached_workers;
736 0 : continue;
737 : }
738 :
739 7672171 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
740 7672171 : if (status == BGWH_STARTED)
741 : {
742 : /* Has the worker attached to the error queue? */
743 7672102 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
744 7672102 : if (shm_mq_get_sender(mq) != NULL)
745 : {
746 : /* Yes, so it is known to be attached. */
747 97 : pcxt->known_attached_workers[i] = true;
748 97 : ++pcxt->nknown_attached_workers;
749 : }
750 : }
751 69 : else if (status == BGWH_STOPPED)
752 : {
753 : /*
754 : * If the worker stopped without attaching to the error queue,
755 : * throw an error.
756 : */
757 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
758 0 : if (shm_mq_get_sender(mq) == NULL)
759 0 : ereport(ERROR,
760 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
761 : errmsg("parallel worker failed to initialize"),
762 : errhint("More details may be available in the server log.")));
763 :
764 0 : pcxt->known_attached_workers[i] = true;
765 0 : ++pcxt->nknown_attached_workers;
766 : }
767 : else
768 : {
769 : /*
770 : * Worker not yet started, so we must wait. The postmaster
771 : * will notify us if the worker's state changes. Our latch
772 : * might also get set for some other reason, but if so we'll
773 : * just end up waiting for the same worker again.
774 : */
775 69 : rc = WaitLatch(MyLatch,
776 : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
777 : -1, WAIT_EVENT_BGWORKER_STARTUP);
778 :
779 69 : if (rc & WL_LATCH_SET)
780 69 : ResetLatch(MyLatch);
781 : }
782 : }
783 :
784 : /* If all workers are known to have started, we're done. */
785 7372161 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
786 : {
787 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
788 97 : break;
789 : }
790 : }
791 : }
792 :
793 : /*
794 : * Wait for all workers to finish computing.
795 : *
796 : * Even if the parallel operation seems to have completed successfully, it's
797 : * important to call this function afterwards. We must not miss any errors
798 : * the workers may have thrown during the parallel operation, or any that they
799 : * may yet throw while shutting down.
800 : *
801 : * Also, we want to update our notion of XactLastRecEnd based on worker
802 : * feedback.
803 : */
804 : void
805 778 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
806 : {
807 : for (;;)
808 514 : {
809 1292 : bool anyone_alive = false;
810 1292 : int nfinished = 0;
811 : int i;
812 :
813 : /*
814 : * This will process any parallel messages that are pending, which may
815 : * change the outcome of the loop that follows. It may also throw an
816 : * error propagated from a worker.
817 : */
818 1292 : CHECK_FOR_INTERRUPTS();
819 :
820 4451 : for (i = 0; i < pcxt->nworkers_launched; ++i)
821 : {
822 : /*
823 : * If error_mqh is NULL, then the worker has already exited
824 : * cleanly. If we have received a message through error_mqh from
825 : * the worker, we know it started up cleanly, and therefore we're
826 : * certain to be notified when it exits.
827 : */
828 3176 : if (pcxt->worker[i].error_mqh == NULL)
829 2629 : ++nfinished;
830 547 : else if (pcxt->known_attached_workers[i])
831 : {
832 17 : anyone_alive = true;
833 17 : break;
834 : }
835 : }
836 :
837 1292 : if (!anyone_alive)
838 : {
839 : /* If all workers are known to have finished, we're done. */
840 1275 : if (nfinished >= pcxt->nworkers_launched)
841 : {
842 : Assert(nfinished == pcxt->nworkers_launched);
843 778 : break;
844 : }
845 :
846 : /*
847 : * We didn't detect any living workers, but not all workers are
848 : * known to have exited cleanly. Either not all workers have
849 : * launched yet, or maybe some of them failed to start or
850 : * terminated abnormally.
851 : */
852 1745 : for (i = 0; i < pcxt->nworkers_launched; ++i)
853 : {
854 : pid_t pid;
855 : shm_mq *mq;
856 :
857 : /*
858 : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
859 : * should just keep waiting. If it is BGWH_STOPPED, then
860 : * further investigation is needed.
861 : */
862 1248 : if (pcxt->worker[i].error_mqh == NULL ||
863 1060 : pcxt->worker[i].bgwhandle == NULL ||
864 530 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
865 : &pid) != BGWH_STOPPED)
866 1248 : continue;
867 :
868 : /*
869 : * Check whether the worker ended up stopped without ever
870 : * attaching to the error queue. If so, the postmaster was
871 : * unable to fork the worker or it exited without initializing
872 : * properly. We must throw an error, since the caller may
873 : * have been expecting the worker to do some work before
874 : * exiting.
875 : */
876 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
877 0 : if (shm_mq_get_sender(mq) == NULL)
878 0 : ereport(ERROR,
879 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
880 : errmsg("parallel worker failed to initialize"),
881 : errhint("More details may be available in the server log.")));
882 :
883 : /*
884 : * The worker is stopped, but is attached to the error queue.
885 : * Unless there's a bug somewhere, this will only happen when
886 : * the worker writes messages and terminates after the
887 : * CHECK_FOR_INTERRUPTS() near the top of this function and
888 : * before the call to GetBackgroundWorkerPid(). In that case,
889 : * our latch should have been set as well and the right things
890 : * will happen on the next pass through the loop.
891 : */
892 : }
893 : }
894 :
895 514 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
896 : WAIT_EVENT_PARALLEL_FINISH);
897 514 : ResetLatch(MyLatch);
898 : }
899 :
900 778 : if (pcxt->toc != NULL)
901 : {
902 : FixedParallelState *fps;
903 :
904 778 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
905 778 : if (fps->last_xlog_end > XactLastRecEnd)
906 19 : XactLastRecEnd = fps->last_xlog_end;
907 : }
908 778 : }
909 :
910 : /*
911 : * Wait for all workers to exit.
912 : *
913 : * This function ensures that workers have been completely shutdown. The
914 : * difference between WaitForParallelWorkersToFinish and this function is
915 : * that the former just ensures that last message sent by a worker backend is
916 : * received by the leader backend whereas this ensures the complete shutdown.
917 : */
918 : static void
919 641 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
920 : {
921 : int i;
922 :
923 : /* Wait until the workers actually die. */
924 2134 : for (i = 0; i < pcxt->nworkers_launched; ++i)
925 : {
926 : BgwHandleStatus status;
927 :
928 1493 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
929 0 : continue;
930 :
931 1493 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
932 :
933 : /*
934 : * If the postmaster kicked the bucket, we have no chance of cleaning
935 : * up safely -- we won't be able to tell when our workers are actually
936 : * dead. This doesn't necessitate a PANIC since they will all abort
937 : * eventually, but we can't safely continue this session.
938 : */
939 1493 : if (status == BGWH_POSTMASTER_DIED)
940 0 : ereport(FATAL,
941 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
942 : errmsg("postmaster exited during a parallel transaction")));
943 :
944 : /* Release memory. */
945 1493 : pfree(pcxt->worker[i].bgwhandle);
946 1493 : pcxt->worker[i].bgwhandle = NULL;
947 : }
948 641 : }
949 :
950 : /*
951 : * Destroy a parallel context.
952 : *
953 : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
954 : * first, before calling this function. When this function is invoked, any
955 : * remaining workers are forcibly killed; the dynamic shared memory segment
956 : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
957 : */
958 : void
959 498 : DestroyParallelContext(ParallelContext *pcxt)
960 : {
961 : int i;
962 :
963 : /*
964 : * Be careful about order of operations here! We remove the parallel
965 : * context from the list before we do anything else; otherwise, if an
966 : * error occurs during a subsequent step, we might try to nuke it again
967 : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
968 : */
969 498 : dlist_delete(&pcxt->node);
970 :
971 : /* Kill each worker in turn, and forget their error queues. */
972 498 : if (pcxt->worker != NULL)
973 : {
974 1567 : for (i = 0; i < pcxt->nworkers_launched; ++i)
975 : {
976 1069 : if (pcxt->worker[i].error_mqh != NULL)
977 : {
978 6 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
979 :
980 6 : shm_mq_detach(pcxt->worker[i].error_mqh);
981 6 : pcxt->worker[i].error_mqh = NULL;
982 : }
983 : }
984 : }
985 :
986 : /*
987 : * If we have allocated a shared memory segment, detach it. This will
988 : * implicitly detach the error queues, and any other shared memory queues,
989 : * stored there.
990 : */
991 498 : if (pcxt->seg != NULL)
992 : {
993 498 : dsm_detach(pcxt->seg);
994 498 : pcxt->seg = NULL;
995 : }
996 :
997 : /*
998 : * If this parallel context is actually in backend-private memory rather
999 : * than shared memory, free that memory instead.
1000 : */
1001 498 : if (pcxt->private_memory != NULL)
1002 : {
1003 0 : pfree(pcxt->private_memory);
1004 0 : pcxt->private_memory = NULL;
1005 : }
1006 :
1007 : /*
1008 : * We can't finish transaction commit or abort until all of the workers
1009 : * have exited. This means, in particular, that we can't respond to
1010 : * interrupts at this stage.
1011 : */
1012 498 : HOLD_INTERRUPTS();
1013 498 : WaitForParallelWorkersToExit(pcxt);
1014 498 : RESUME_INTERRUPTS();
1015 :
1016 : /* Free the worker array itself. */
1017 498 : if (pcxt->worker != NULL)
1018 : {
1019 498 : pfree(pcxt->worker);
1020 498 : pcxt->worker = NULL;
1021 : }
1022 :
1023 : /* Free memory. */
1024 498 : pfree(pcxt->library_name);
1025 498 : pfree(pcxt->function_name);
1026 498 : pfree(pcxt);
1027 498 : }
1028 :
1029 : /*
1030 : * Are there any parallel contexts currently active?
1031 : */
1032 : bool
1033 0 : ParallelContextActive(void)
1034 : {
1035 0 : return !dlist_is_empty(&pcxt_list);
1036 : }
1037 :
1038 : /*
1039 : * Handle receipt of an interrupt indicating a parallel worker message.
1040 : *
1041 : * Note: this is called within a signal handler! All we can do is set
1042 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1043 : * ProcessParallelMessages().
1044 : */
1045 : void
1046 1629 : HandleParallelMessageInterrupt(void)
1047 : {
1048 1629 : InterruptPending = true;
1049 1629 : ParallelMessagePending = true;
1050 1629 : SetLatch(MyLatch);
1051 1629 : }
1052 :
1053 : /*
1054 : * Process any queued protocol messages received from parallel workers.
1055 : */
1056 : void
1057 1599 : ProcessParallelMessages(void)
1058 : {
1059 : dlist_iter iter;
1060 : MemoryContext oldcontext;
1061 :
1062 : static MemoryContext hpm_context = NULL;
1063 :
1064 : /*
1065 : * This is invoked from ProcessInterrupts(), and since some of the
1066 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1067 : * for recursive calls if more signals are received while this runs. It's
1068 : * unclear that recursive entry would be safe, and it doesn't seem useful
1069 : * even if it is safe, so let's block interrupts until done.
1070 : */
1071 1599 : HOLD_INTERRUPTS();
1072 :
1073 : /*
1074 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1075 : * don't want to risk leaking data into long-lived contexts, so let's do
1076 : * our work here in a private context that we can reset on each use.
1077 : */
1078 1599 : if (hpm_context == NULL) /* first time through? */
1079 82 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1080 : "ProcessParallelMessages",
1081 : ALLOCSET_DEFAULT_SIZES);
1082 : else
1083 1517 : MemoryContextReset(hpm_context);
1084 :
1085 1599 : oldcontext = MemoryContextSwitchTo(hpm_context);
1086 :
1087 : /* OK to process messages. Reset the flag saying there are more to do. */
1088 1599 : ParallelMessagePending = false;
1089 :
1090 3313 : dlist_foreach(iter, &pcxt_list)
1091 : {
1092 : ParallelContext *pcxt;
1093 : int i;
1094 :
1095 1720 : pcxt = dlist_container(ParallelContext, node, iter.cur);
1096 1720 : if (pcxt->worker == NULL)
1097 0 : continue;
1098 :
1099 6956 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1100 : {
1101 : /*
1102 : * Read as many messages as we can from each worker, but stop when
1103 : * either (1) the worker's error queue goes away, which can happen
1104 : * if we receive a Terminate message from the worker; or (2) no
1105 : * more messages can be read from the worker without blocking.
1106 : */
1107 6731 : while (pcxt->worker[i].error_mqh != NULL)
1108 : {
1109 : shm_mq_result res;
1110 : Size nbytes;
1111 : void *data;
1112 :
1113 3066 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1114 : &data, true);
1115 3066 : if (res == SHM_MQ_WOULD_BLOCK)
1116 1571 : break;
1117 1495 : else if (res == SHM_MQ_SUCCESS)
1118 : {
1119 : StringInfoData msg;
1120 :
1121 1495 : initStringInfo(&msg);
1122 1495 : appendBinaryStringInfo(&msg, data, nbytes);
1123 1495 : ProcessParallelMessage(pcxt, i, &msg);
1124 1489 : pfree(msg.data);
1125 : }
1126 : else
1127 0 : ereport(ERROR,
1128 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1129 : errmsg("lost connection to parallel worker")));
1130 : }
1131 : }
1132 : }
1133 :
1134 1593 : MemoryContextSwitchTo(oldcontext);
1135 :
1136 : /* Might as well clear the context on our way out */
1137 1593 : MemoryContextReset(hpm_context);
1138 :
1139 1593 : RESUME_INTERRUPTS();
1140 1593 : }
1141 :
1142 : /*
1143 : * Process a single protocol message received from a single parallel worker.
1144 : */
1145 : static void
1146 1495 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1147 : {
1148 : char msgtype;
1149 :
1150 1495 : if (pcxt->known_attached_workers != NULL &&
1151 1495 : !pcxt->known_attached_workers[i])
1152 : {
1153 1396 : pcxt->known_attached_workers[i] = true;
1154 1396 : pcxt->nknown_attached_workers++;
1155 : }
1156 :
1157 1495 : msgtype = pq_getmsgbyte(msg);
1158 :
1159 1495 : switch (msgtype)
1160 : {
1161 6 : case PqMsg_ErrorResponse:
1162 : case PqMsg_NoticeResponse:
1163 : {
1164 : ErrorData edata;
1165 : ErrorContextCallback *save_error_context_stack;
1166 :
1167 : /* Parse ErrorResponse or NoticeResponse. */
1168 6 : pq_parse_errornotice(msg, &edata);
1169 :
1170 : /* Death of a worker isn't enough justification for suicide. */
1171 6 : edata.elevel = Min(edata.elevel, ERROR);
1172 :
1173 : /*
1174 : * If desired, add a context line to show that this is a
1175 : * message propagated from a parallel worker. Otherwise, it
1176 : * can sometimes be confusing to understand what actually
1177 : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1178 : * because it causes test-result instability depending on
1179 : * whether a parallel worker is actually used or not.)
1180 : */
1181 6 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1182 : {
1183 6 : if (edata.context)
1184 3 : edata.context = psprintf("%s\n%s", edata.context,
1185 : _("parallel worker"));
1186 : else
1187 3 : edata.context = pstrdup(_("parallel worker"));
1188 : }
1189 :
1190 : /*
1191 : * Context beyond that should use the error context callbacks
1192 : * that were in effect when the ParallelContext was created,
1193 : * not the current ones.
1194 : */
1195 6 : save_error_context_stack = error_context_stack;
1196 6 : error_context_stack = pcxt->error_context_stack;
1197 :
1198 : /* Rethrow error or print notice. */
1199 6 : ThrowErrorData(&edata);
1200 :
1201 : /* Not an error, so restore previous context stack. */
1202 0 : error_context_stack = save_error_context_stack;
1203 :
1204 0 : break;
1205 : }
1206 :
1207 0 : case PqMsg_NotificationResponse:
1208 : {
1209 : /* Propagate NotifyResponse. */
1210 : int32 pid;
1211 : const char *channel;
1212 : const char *payload;
1213 :
1214 0 : pid = pq_getmsgint(msg, 4);
1215 0 : channel = pq_getmsgrawstring(msg);
1216 0 : payload = pq_getmsgrawstring(msg);
1217 0 : pq_endmessage(msg);
1218 :
1219 0 : NotifyMyFrontEnd(channel, payload, pid);
1220 :
1221 0 : break;
1222 : }
1223 :
1224 2 : case PqMsg_Progress:
1225 : {
1226 : /*
1227 : * Only incremental progress reporting is currently supported.
1228 : * However, it's possible to add more fields to the message to
1229 : * allow for handling of other backend progress APIs.
1230 : */
1231 2 : int index = pq_getmsgint(msg, 4);
1232 2 : int64 incr = pq_getmsgint64(msg);
1233 :
1234 2 : pq_getmsgend(msg);
1235 :
1236 2 : pgstat_progress_incr_param(index, incr);
1237 :
1238 2 : break;
1239 : }
1240 :
1241 1487 : case PqMsg_Terminate:
1242 : {
1243 1487 : shm_mq_detach(pcxt->worker[i].error_mqh);
1244 1487 : pcxt->worker[i].error_mqh = NULL;
1245 1487 : break;
1246 : }
1247 :
1248 0 : default:
1249 : {
1250 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1251 : msgtype, msg->len);
1252 : }
1253 : }
1254 1489 : }
1255 :
1256 : /*
1257 : * End-of-subtransaction cleanup for parallel contexts.
1258 : *
1259 : * Here we remove only parallel contexts initiated within the current
1260 : * subtransaction.
1261 : */
1262 : void
1263 10084 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1264 : {
1265 10087 : while (!dlist_is_empty(&pcxt_list))
1266 : {
1267 : ParallelContext *pcxt;
1268 :
1269 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1270 3 : if (pcxt->subid != mySubId)
1271 0 : break;
1272 3 : if (isCommit)
1273 0 : elog(WARNING, "leaked parallel context");
1274 3 : DestroyParallelContext(pcxt);
1275 : }
1276 10084 : }
1277 :
1278 : /*
1279 : * End-of-transaction cleanup for parallel contexts.
1280 : *
1281 : * We nuke all remaining parallel contexts.
1282 : */
1283 : void
1284 546622 : AtEOXact_Parallel(bool isCommit)
1285 : {
1286 546625 : while (!dlist_is_empty(&pcxt_list))
1287 : {
1288 : ParallelContext *pcxt;
1289 :
1290 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1291 3 : if (isCommit)
1292 0 : elog(WARNING, "leaked parallel context");
1293 3 : DestroyParallelContext(pcxt);
1294 : }
1295 546622 : }
1296 :
1297 : /*
1298 : * Main entrypoint for parallel workers.
1299 : */
1300 : void
1301 1493 : ParallelWorkerMain(Datum main_arg)
1302 : {
1303 : dsm_segment *seg;
1304 : shm_toc *toc;
1305 : FixedParallelState *fps;
1306 : char *error_queue_space;
1307 : shm_mq *mq;
1308 : shm_mq_handle *mqh;
1309 : char *libraryspace;
1310 : char *entrypointstate;
1311 : char *library_name;
1312 : char *function_name;
1313 : parallel_worker_main_type entrypt;
1314 : char *gucspace;
1315 : char *combocidspace;
1316 : char *tsnapspace;
1317 : char *asnapspace;
1318 : char *tstatespace;
1319 : char *pendingsyncsspace;
1320 : char *reindexspace;
1321 : char *relmapperspace;
1322 : char *uncommittedenumsspace;
1323 : char *clientconninfospace;
1324 : char *session_dsm_handle_space;
1325 : Snapshot tsnapshot;
1326 : Snapshot asnapshot;
1327 :
1328 : /* Set flag to indicate that we're initializing a parallel worker. */
1329 1493 : InitializingParallelWorker = true;
1330 :
1331 : /* Establish signal handlers. */
1332 1493 : BackgroundWorkerUnblockSignals();
1333 :
1334 : /* Determine and set our parallel worker number. */
1335 : Assert(ParallelWorkerNumber == -1);
1336 1493 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1337 :
1338 : /* Set up a memory context to work in, just for cleanliness. */
1339 1493 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1340 : "Parallel worker",
1341 : ALLOCSET_DEFAULT_SIZES);
1342 :
1343 : /*
1344 : * Attach to the dynamic shared memory segment for the parallel query, and
1345 : * find its table of contents.
1346 : *
1347 : * Note: at this point, we have not created any ResourceOwner in this
1348 : * process. This will result in our DSM mapping surviving until process
1349 : * exit, which is fine. If there were a ResourceOwner, it would acquire
1350 : * ownership of the mapping, but we have no need for that.
1351 : */
1352 1493 : seg = dsm_attach(DatumGetUInt32(main_arg));
1353 1493 : if (seg == NULL)
1354 0 : ereport(ERROR,
1355 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1356 : errmsg("could not map dynamic shared memory segment")));
1357 1493 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1358 1493 : if (toc == NULL)
1359 0 : ereport(ERROR,
1360 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1361 : errmsg("invalid magic number in dynamic shared memory segment")));
1362 :
1363 : /* Look up fixed parallel state. */
1364 1493 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1365 1493 : MyFixedParallelState = fps;
1366 :
1367 : /* Arrange to signal the leader if we exit. */
1368 1493 : ParallelLeaderPid = fps->parallel_leader_pid;
1369 1493 : ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1370 1493 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1371 :
1372 : /*
1373 : * Now we can find and attach to the error queue provided for us. That's
1374 : * good, because until we do that, any errors that happen here will not be
1375 : * reported back to the process that requested that this worker be
1376 : * launched.
1377 : */
1378 1493 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1379 1493 : mq = (shm_mq *) (error_queue_space +
1380 1493 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1381 1493 : shm_mq_set_sender(mq, MyProc);
1382 1493 : mqh = shm_mq_attach(mq, seg, NULL);
1383 1493 : pq_redirect_to_shm_mq(seg, mqh);
1384 1493 : pq_set_parallel_leader(fps->parallel_leader_pid,
1385 : fps->parallel_leader_proc_number);
1386 :
1387 : /*
1388 : * Hooray! Primary initialization is complete. Now, we need to set up our
1389 : * backend-local state to match the original backend.
1390 : */
1391 :
1392 : /*
1393 : * Join locking group. We must do this before anything that could try to
1394 : * acquire a heavyweight lock, because any heavyweight locks acquired to
1395 : * this point could block either directly against the parallel group
1396 : * leader or against some process which in turn waits for a lock that
1397 : * conflicts with the parallel group leader, causing an undetected
1398 : * deadlock. (If we can't join the lock group, the leader has gone away,
1399 : * so just exit quietly.)
1400 : */
1401 1493 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1402 : fps->parallel_leader_pid))
1403 0 : return;
1404 :
1405 : /*
1406 : * Restore transaction and statement start-time timestamps. This must
1407 : * happen before anything that would start a transaction, else asserts in
1408 : * xact.c will fire.
1409 : */
1410 1493 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1411 :
1412 : /*
1413 : * Identify the entry point to be called. In theory this could result in
1414 : * loading an additional library, though most likely the entry point is in
1415 : * the core backend or in a library we just loaded.
1416 : */
1417 1493 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1418 1493 : library_name = entrypointstate;
1419 1493 : function_name = entrypointstate + strlen(library_name) + 1;
1420 :
1421 1493 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
1422 :
1423 : /*
1424 : * Restore current session authorization and role id. No verification
1425 : * happens here, we just blindly adopt the leader's state. Note that this
1426 : * has to happen before InitPostgres, since InitializeSessionUserId will
1427 : * not set these variables.
1428 : */
1429 1493 : SetAuthenticatedUserId(fps->authenticated_user_id);
1430 1493 : SetSessionAuthorization(fps->session_user_id,
1431 1493 : fps->session_user_is_superuser);
1432 1493 : SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
1433 :
1434 : /*
1435 : * Restore database connection. We skip connection authorization checks,
1436 : * reasoning that (a) the leader checked these things when it started, and
1437 : * (b) we do not want parallel mode to cause these failures, because that
1438 : * would make use of parallel query plans not transparent to applications.
1439 : */
1440 1493 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1441 : fps->authenticated_user_id,
1442 : BGWORKER_BYPASS_ALLOWCONN |
1443 : BGWORKER_BYPASS_ROLELOGINCHECK);
1444 :
1445 : /*
1446 : * Set the client encoding to the database encoding, since that is what
1447 : * the leader will expect. (We're cheating a bit by not calling
1448 : * PrepareClientEncoding first. It's okay because this call will always
1449 : * result in installing a no-op conversion. No error should be possible,
1450 : * but check anyway.)
1451 : */
1452 1493 : if (SetClientEncoding(GetDatabaseEncoding()) < 0)
1453 0 : elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
1454 :
1455 : /*
1456 : * Load libraries that were loaded by original backend. We want to do
1457 : * this before restoring GUCs, because the libraries might define custom
1458 : * variables.
1459 : */
1460 1493 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1461 1493 : StartTransactionCommand();
1462 1493 : RestoreLibraryState(libraryspace);
1463 1493 : CommitTransactionCommand();
1464 :
1465 : /* Crank up a transaction state appropriate to a parallel worker. */
1466 1493 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1467 1493 : StartParallelWorkerTransaction(tstatespace);
1468 :
1469 : /*
1470 : * Restore state that affects catalog access. Ideally we'd do this even
1471 : * before calling InitPostgres, but that has order-of-initialization
1472 : * problems, and also the relmapper would get confused during the
1473 : * CommitTransactionCommand call above.
1474 : */
1475 1493 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1476 : false);
1477 1493 : RestorePendingSyncs(pendingsyncsspace);
1478 1493 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1479 1493 : RestoreRelationMap(relmapperspace);
1480 1493 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1481 1493 : RestoreReindexState(reindexspace);
1482 1493 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1483 1493 : RestoreComboCIDState(combocidspace);
1484 :
1485 : /* Attach to the per-session DSM segment and contained objects. */
1486 : session_dsm_handle_space =
1487 1493 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1488 1493 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1489 :
1490 : /*
1491 : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1492 : * the leader has serialized the transaction snapshot and we must restore
1493 : * it. At lower isolation levels, there is no transaction-lifetime
1494 : * snapshot, but we need TransactionXmin to get set to a value which is
1495 : * less than or equal to the xmin of every snapshot that will be used by
1496 : * this worker. The easiest way to accomplish that is to install the
1497 : * active snapshot as the transaction snapshot. Code running in this
1498 : * parallel worker might take new snapshots via GetTransactionSnapshot()
1499 : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1500 : * snapshot older than the active snapshot.
1501 : */
1502 1493 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1503 1493 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1504 1493 : asnapshot = RestoreSnapshot(asnapspace);
1505 1493 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1506 1493 : RestoreTransactionSnapshot(tsnapshot,
1507 1493 : fps->parallel_leader_pgproc);
1508 1493 : PushActiveSnapshot(asnapshot);
1509 :
1510 : /*
1511 : * We've changed which tuples we can see, and must therefore invalidate
1512 : * system caches.
1513 : */
1514 1493 : InvalidateSystemCaches();
1515 :
1516 : /*
1517 : * Restore GUC values from launching backend. We can't do this earlier,
1518 : * because GUC check hooks that do catalog lookups need to see the same
1519 : * database state as the leader. Also, the check hooks for
1520 : * session_authorization and role assume we already set the correct role
1521 : * OIDs.
1522 : */
1523 1493 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1524 1493 : RestoreGUCState(gucspace);
1525 :
1526 : /*
1527 : * Restore current user ID and security context. No verification happens
1528 : * here, we just blindly adopt the leader's state. We can't do this till
1529 : * after restoring GUCs, else we'll get complaints about restoring
1530 : * session_authorization and role. (In effect, we're assuming that all
1531 : * the restored values are okay to set, even if we are now inside a
1532 : * restricted context.)
1533 : */
1534 1493 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1535 :
1536 : /* Restore temp-namespace state to ensure search path matches leader's. */
1537 1493 : SetTempNamespaceState(fps->temp_namespace_id,
1538 : fps->temp_toast_namespace_id);
1539 :
1540 : /* Restore uncommitted enums. */
1541 1493 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1542 : false);
1543 1493 : RestoreUncommittedEnums(uncommittedenumsspace);
1544 :
1545 : /* Restore the ClientConnectionInfo. */
1546 1493 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1547 : false);
1548 1493 : RestoreClientConnectionInfo(clientconninfospace);
1549 :
1550 : /*
1551 : * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1552 : * ensure that auth_method is actually valid, aka authn_id is not NULL.
1553 : */
1554 1493 : if (MyClientConnectionInfo.authn_id)
1555 2 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1556 : hba_authname(MyClientConnectionInfo.auth_method));
1557 :
1558 : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
1559 1493 : AttachSerializableXact(fps->serializable_xact_handle);
1560 :
1561 : /*
1562 : * We've initialized all of our state now; nothing should change
1563 : * hereafter.
1564 : */
1565 1493 : InitializingParallelWorker = false;
1566 1493 : EnterParallelMode();
1567 :
1568 : /*
1569 : * Time to do the real work: invoke the caller-supplied code.
1570 : */
1571 1493 : entrypt(seg, toc);
1572 :
1573 : /* Must exit parallel mode to pop active snapshot. */
1574 1487 : ExitParallelMode();
1575 :
1576 : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1577 1487 : PopActiveSnapshot();
1578 :
1579 : /* Shut down the parallel-worker transaction. */
1580 1487 : EndParallelWorkerTransaction();
1581 :
1582 : /* Detach from the per-session DSM segment. */
1583 1487 : DetachSession();
1584 :
1585 : /* Report success. */
1586 1487 : pq_putmessage(PqMsg_Terminate, NULL, 0);
1587 : }
1588 :
1589 : /*
1590 : * Update shared memory with the ending location of the last WAL record we
1591 : * wrote, if it's greater than the value already stored there.
1592 : */
1593 : void
1594 1487 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1595 : {
1596 1487 : FixedParallelState *fps = MyFixedParallelState;
1597 :
1598 : Assert(fps != NULL);
1599 1487 : SpinLockAcquire(&fps->mutex);
1600 1487 : if (fps->last_xlog_end < last_xlog_end)
1601 113 : fps->last_xlog_end = last_xlog_end;
1602 1487 : SpinLockRelease(&fps->mutex);
1603 1487 : }
1604 :
1605 : /*
1606 : * Make sure the leader tries to read from our error queue one more time.
1607 : * This guards against the case where we exit uncleanly without sending an
1608 : * ErrorResponse to the leader, for example because some code calls proc_exit
1609 : * directly.
1610 : *
1611 : * Also explicitly detach from dsm segment so that subsystems using
1612 : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1613 : * shut down as part of a before_shmem_exit() hook.
1614 : *
1615 : * One might think this could instead be solved by carefully ordering the
1616 : * attaching to dsm segments, so that the pgstats segments get detached from
1617 : * later than the parallel query one. That turns out to not work because the
1618 : * stats hash might need to grow which can cause new segments to be allocated,
1619 : * which then will be detached from earlier.
1620 : */
1621 : static void
1622 1493 : ParallelWorkerShutdown(int code, Datum arg)
1623 : {
1624 1493 : SendProcSignal(ParallelLeaderPid,
1625 : PROCSIG_PARALLEL_MESSAGE,
1626 : ParallelLeaderProcNumber);
1627 :
1628 1493 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
1629 1493 : }
1630 :
1631 : /*
1632 : * Look up (and possibly load) a parallel worker entry point function.
1633 : *
1634 : * For functions contained in the core code, we use library name "postgres"
1635 : * and consult the InternalParallelWorkers array. External functions are
1636 : * looked up, and loaded if necessary, using load_external_function().
1637 : *
1638 : * The point of this is to pass function names as strings across process
1639 : * boundaries. We can't pass actual function addresses because of the
1640 : * possibility that the function has been loaded at a different address
1641 : * in a different process. This is obviously a hazard for functions in
1642 : * loadable libraries, but it can happen even for functions in the core code
1643 : * on platforms using EXEC_BACKEND (e.g., Windows).
1644 : *
1645 : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1646 : * in favor of applying load_external_function() for core functions too;
1647 : * but that raises portability issues that are not worth addressing now.
1648 : */
1649 : static parallel_worker_main_type
1650 1493 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1651 : {
1652 : /*
1653 : * If the function is to be loaded from postgres itself, search the
1654 : * InternalParallelWorkers array.
1655 : */
1656 1493 : if (strcmp(libraryname, "postgres") == 0)
1657 : {
1658 : int i;
1659 :
1660 1814 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1661 : {
1662 1814 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1663 1493 : return InternalParallelWorkers[i].fn_addr;
1664 : }
1665 :
1666 : /* We can only reach this by programming error. */
1667 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1668 : }
1669 :
1670 : /* Otherwise load from external library. */
1671 0 : return (parallel_worker_main_type)
1672 0 : load_external_function(libraryname, funcname, true, NULL);
1673 : }
|