Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : * Infrastructure for launching parallel workers
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/parallel.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/brin.h"
18 : #include "access/nbtree.h"
19 : #include "access/parallel.h"
20 : #include "access/session.h"
21 : #include "access/xact.h"
22 : #include "access/xlog.h"
23 : #include "catalog/index.h"
24 : #include "catalog/namespace.h"
25 : #include "catalog/pg_enum.h"
26 : #include "catalog/storage.h"
27 : #include "commands/async.h"
28 : #include "commands/vacuum.h"
29 : #include "executor/execParallel.h"
30 : #include "libpq/libpq.h"
31 : #include "libpq/pqformat.h"
32 : #include "libpq/pqmq.h"
33 : #include "miscadmin.h"
34 : #include "optimizer/optimizer.h"
35 : #include "pgstat.h"
36 : #include "storage/ipc.h"
37 : #include "storage/predicate.h"
38 : #include "storage/spin.h"
39 : #include "tcop/tcopprot.h"
40 : #include "utils/combocid.h"
41 : #include "utils/guc.h"
42 : #include "utils/inval.h"
43 : #include "utils/memutils.h"
44 : #include "utils/relmapper.h"
45 : #include "utils/snapmgr.h"
46 :
47 : /*
48 : * We don't want to waste a lot of memory on an error queue which, most of
49 : * the time, will process only a handful of small messages. However, it is
50 : * desirable to make it large enough that a typical ErrorResponse can be sent
51 : * without blocking. That way, a worker that errors out can write the whole
52 : * message into the queue and terminate without waiting for the user backend.
53 : */
54 : #define PARALLEL_ERROR_QUEUE_SIZE 16384
55 :
56 : /* Magic number for parallel context TOC. */
57 : #define PARALLEL_MAGIC 0x50477c7c
58 :
59 : /*
60 : * Magic numbers for per-context parallel state sharing. Higher-level code
61 : * should use smaller values, leaving these very large ones for use by this
62 : * module.
63 : */
64 : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
65 : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
66 : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
67 : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
68 : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
69 : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
70 : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
71 : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
72 : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
73 : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
74 : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
75 : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
76 : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
77 : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
78 : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
79 :
80 : /* Fixed-size parallel state. */
81 : typedef struct FixedParallelState
82 : {
83 : /* Fixed-size state that workers must restore. */
84 : Oid database_id;
85 : Oid authenticated_user_id;
86 : Oid session_user_id;
87 : Oid outer_user_id;
88 : Oid current_user_id;
89 : Oid temp_namespace_id;
90 : Oid temp_toast_namespace_id;
91 : int sec_context;
92 : bool session_user_is_superuser;
93 : bool role_is_superuser;
94 : PGPROC *parallel_leader_pgproc;
95 : pid_t parallel_leader_pid;
96 : ProcNumber parallel_leader_proc_number;
97 : TimestampTz xact_ts;
98 : TimestampTz stmt_ts;
99 : SerializableXactHandle serializable_xact_handle;
100 :
101 : /* Mutex protects remaining fields. */
102 : slock_t mutex;
103 :
104 : /* Maximum XactLastRecEnd of any worker. */
105 : XLogRecPtr last_xlog_end;
106 : } FixedParallelState;
107 :
108 : /*
109 : * Our parallel worker number. We initialize this to -1, meaning that we are
110 : * not a parallel worker. In parallel workers, it will be set to a value >= 0
111 : * and < the number of workers before any user code is invoked; each parallel
112 : * worker will get a different parallel worker number.
113 : */
114 : int ParallelWorkerNumber = -1;
115 :
116 : /* Is there a parallel message pending which we need to receive? */
117 : volatile sig_atomic_t ParallelMessagePending = false;
118 :
119 : /* Are we initializing a parallel worker? */
120 : bool InitializingParallelWorker = false;
121 :
122 : /* Pointer to our fixed parallel state. */
123 : static FixedParallelState *MyFixedParallelState;
124 :
125 : /* List of active parallel contexts. */
126 : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
127 :
128 : /* Backend-local copy of data from FixedParallelState. */
129 : static pid_t ParallelLeaderPid;
130 :
131 : /*
132 : * List of internal parallel worker entry points. We need this for
133 : * reasons explained in LookupParallelWorkerFunction(), below.
134 : */
135 : static const struct
136 : {
137 : const char *fn_name;
138 : parallel_worker_main_type fn_addr;
139 : } InternalParallelWorkers[] =
140 :
141 : {
142 : {
143 : "ParallelQueryMain", ParallelQueryMain
144 : },
145 : {
146 : "_bt_parallel_build_main", _bt_parallel_build_main
147 : },
148 : {
149 : "_brin_parallel_build_main", _brin_parallel_build_main
150 : },
151 : {
152 : "parallel_vacuum_main", parallel_vacuum_main
153 : }
154 : };
155 :
156 : /* Private functions. */
157 : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
158 : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
159 : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
160 : static void ParallelWorkerShutdown(int code, Datum arg);
161 :
162 :
163 : /*
164 : * Establish a new parallel context. This should be done after entering
165 : * parallel mode, and (unless there is an error) the context should be
166 : * destroyed before exiting the current subtransaction.
167 : */
168 : ParallelContext *
169 892 : CreateParallelContext(const char *library_name, const char *function_name,
170 : int nworkers)
171 : {
172 : MemoryContext oldcontext;
173 : ParallelContext *pcxt;
174 :
175 : /* It is unsafe to create a parallel context if not in parallel mode. */
176 : Assert(IsInParallelMode());
177 :
178 : /* Number of workers should be non-negative. */
179 : Assert(nworkers >= 0);
180 :
181 : /* We might be running in a short-lived memory context. */
182 892 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
183 :
184 : /* Initialize a new ParallelContext. */
185 892 : pcxt = palloc0(sizeof(ParallelContext));
186 892 : pcxt->subid = GetCurrentSubTransactionId();
187 892 : pcxt->nworkers = nworkers;
188 892 : pcxt->nworkers_to_launch = nworkers;
189 892 : pcxt->library_name = pstrdup(library_name);
190 892 : pcxt->function_name = pstrdup(function_name);
191 892 : pcxt->error_context_stack = error_context_stack;
192 892 : shm_toc_initialize_estimator(&pcxt->estimator);
193 892 : dlist_push_head(&pcxt_list, &pcxt->node);
194 :
195 : /* Restore previous memory context. */
196 892 : MemoryContextSwitchTo(oldcontext);
197 :
198 892 : return pcxt;
199 : }
200 :
201 : /*
202 : * Establish the dynamic shared memory segment for a parallel context and
203 : * copy state and other bookkeeping information that will be needed by
204 : * parallel workers into it.
205 : */
206 : void
207 892 : InitializeParallelDSM(ParallelContext *pcxt)
208 : {
209 : MemoryContext oldcontext;
210 892 : Size library_len = 0;
211 892 : Size guc_len = 0;
212 892 : Size combocidlen = 0;
213 892 : Size tsnaplen = 0;
214 892 : Size asnaplen = 0;
215 892 : Size tstatelen = 0;
216 892 : Size pendingsyncslen = 0;
217 892 : Size reindexlen = 0;
218 892 : Size relmapperlen = 0;
219 892 : Size uncommittedenumslen = 0;
220 892 : Size clientconninfolen = 0;
221 892 : Size segsize = 0;
222 : int i;
223 : FixedParallelState *fps;
224 892 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
225 892 : Snapshot transaction_snapshot = GetTransactionSnapshot();
226 892 : Snapshot active_snapshot = GetActiveSnapshot();
227 :
228 : /* We might be running in a very short-lived memory context. */
229 892 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
230 :
231 : /* Allow space to store the fixed-size parallel state. */
232 892 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
233 892 : shm_toc_estimate_keys(&pcxt->estimator, 1);
234 :
235 : /*
236 : * If we manage to reach here while non-interruptible, it's unsafe to
237 : * launch any workers: we would fail to process interrupts sent by them.
238 : * We can deal with that edge case by pretending no workers were
239 : * requested.
240 : */
241 892 : if (!INTERRUPTS_CAN_BE_PROCESSED())
242 0 : pcxt->nworkers = 0;
243 :
244 : /*
245 : * Normally, the user will have requested at least one worker process, but
246 : * if by chance they have not, we can skip a bunch of things here.
247 : */
248 892 : if (pcxt->nworkers > 0)
249 : {
250 : /* Get (or create) the per-session DSM segment's handle. */
251 892 : session_dsm_handle = GetSessionDsmHandle();
252 :
253 : /*
254 : * If we weren't able to create a per-session DSM segment, then we can
255 : * continue but we can't safely launch any workers because their
256 : * record typmods would be incompatible so they couldn't exchange
257 : * tuples.
258 : */
259 892 : if (session_dsm_handle == DSM_HANDLE_INVALID)
260 0 : pcxt->nworkers = 0;
261 : }
262 :
263 892 : if (pcxt->nworkers > 0)
264 : {
265 : /* Estimate space for various kinds of state sharing. */
266 892 : library_len = EstimateLibraryStateSpace();
267 892 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
268 892 : guc_len = EstimateGUCStateSpace();
269 892 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
270 892 : combocidlen = EstimateComboCIDStateSpace();
271 892 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
272 892 : if (IsolationUsesXactSnapshot())
273 : {
274 22 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
275 22 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
276 : }
277 892 : asnaplen = EstimateSnapshotSpace(active_snapshot);
278 892 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
279 892 : tstatelen = EstimateTransactionStateSpace();
280 892 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
281 892 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
282 892 : pendingsyncslen = EstimatePendingSyncsSpace();
283 892 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
284 892 : reindexlen = EstimateReindexStateSpace();
285 892 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
286 892 : relmapperlen = EstimateRelationMapSpace();
287 892 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
288 892 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
289 892 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
290 892 : clientconninfolen = EstimateClientConnectionInfoSpace();
291 892 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
292 : /* If you add more chunks here, you probably need to add keys. */
293 892 : shm_toc_estimate_keys(&pcxt->estimator, 12);
294 :
295 : /* Estimate space need for error queues. */
296 : StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
297 : PARALLEL_ERROR_QUEUE_SIZE,
298 : "parallel error queue size not buffer-aligned");
299 892 : shm_toc_estimate_chunk(&pcxt->estimator,
300 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
301 : pcxt->nworkers));
302 892 : shm_toc_estimate_keys(&pcxt->estimator, 1);
303 :
304 : /* Estimate how much we'll need for the entrypoint info. */
305 892 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
306 : strlen(pcxt->function_name) + 2);
307 892 : shm_toc_estimate_keys(&pcxt->estimator, 1);
308 : }
309 :
310 : /*
311 : * Create DSM and initialize with new table of contents. But if the user
312 : * didn't request any workers, then don't bother creating a dynamic shared
313 : * memory segment; instead, just use backend-private memory.
314 : *
315 : * Also, if we can't create a dynamic shared memory segment because the
316 : * maximum number of segments have already been created, then fall back to
317 : * backend-private memory, and plan not to use any workers. We hope this
318 : * won't happen very often, but it's better to abandon the use of
319 : * parallelism than to fail outright.
320 : */
321 892 : segsize = shm_toc_estimate(&pcxt->estimator);
322 892 : if (pcxt->nworkers > 0)
323 892 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
324 892 : if (pcxt->seg != NULL)
325 892 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
326 : dsm_segment_address(pcxt->seg),
327 : segsize);
328 : else
329 : {
330 0 : pcxt->nworkers = 0;
331 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
332 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
333 : segsize);
334 : }
335 :
336 : /* Initialize fixed-size state in shared memory. */
337 : fps = (FixedParallelState *)
338 892 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
339 892 : fps->database_id = MyDatabaseId;
340 892 : fps->authenticated_user_id = GetAuthenticatedUserId();
341 892 : fps->session_user_id = GetSessionUserId();
342 892 : fps->outer_user_id = GetCurrentRoleId();
343 892 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
344 892 : fps->session_user_is_superuser = GetSessionUserIsSuperuser();
345 892 : fps->role_is_superuser = current_role_is_superuser;
346 892 : GetTempNamespaceState(&fps->temp_namespace_id,
347 : &fps->temp_toast_namespace_id);
348 892 : fps->parallel_leader_pgproc = MyProc;
349 892 : fps->parallel_leader_pid = MyProcPid;
350 892 : fps->parallel_leader_proc_number = MyProcNumber;
351 892 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
352 892 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
353 892 : fps->serializable_xact_handle = ShareSerializableXact();
354 892 : SpinLockInit(&fps->mutex);
355 892 : fps->last_xlog_end = 0;
356 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
357 :
358 : /* We can skip the rest of this if we're not budgeting for any workers. */
359 892 : if (pcxt->nworkers > 0)
360 : {
361 : char *libraryspace;
362 : char *gucspace;
363 : char *combocidspace;
364 : char *tsnapspace;
365 : char *asnapspace;
366 : char *tstatespace;
367 : char *pendingsyncsspace;
368 : char *reindexspace;
369 : char *relmapperspace;
370 : char *error_queue_space;
371 : char *session_dsm_handle_space;
372 : char *entrypointstate;
373 : char *uncommittedenumsspace;
374 : char *clientconninfospace;
375 : Size lnamelen;
376 :
377 : /* Serialize shared libraries we have loaded. */
378 892 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
379 892 : SerializeLibraryState(library_len, libraryspace);
380 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
381 :
382 : /* Serialize GUC settings. */
383 892 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
384 892 : SerializeGUCState(guc_len, gucspace);
385 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
386 :
387 : /* Serialize combo CID state. */
388 892 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
389 892 : SerializeComboCIDState(combocidlen, combocidspace);
390 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
391 :
392 : /*
393 : * Serialize the transaction snapshot if the transaction isolation
394 : * level uses a transaction snapshot.
395 : */
396 892 : if (IsolationUsesXactSnapshot())
397 : {
398 22 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
399 22 : SerializeSnapshot(transaction_snapshot, tsnapspace);
400 22 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
401 : tsnapspace);
402 : }
403 :
404 : /* Serialize the active snapshot. */
405 892 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
406 892 : SerializeSnapshot(active_snapshot, asnapspace);
407 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
408 :
409 : /* Provide the handle for per-session segment. */
410 892 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
411 : sizeof(dsm_handle));
412 892 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
413 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
414 : session_dsm_handle_space);
415 :
416 : /* Serialize transaction state. */
417 892 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
418 892 : SerializeTransactionState(tstatelen, tstatespace);
419 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
420 :
421 : /* Serialize pending syncs. */
422 892 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
423 892 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
424 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
425 : pendingsyncsspace);
426 :
427 : /* Serialize reindex state. */
428 892 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
429 892 : SerializeReindexState(reindexlen, reindexspace);
430 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
431 :
432 : /* Serialize relmapper state. */
433 892 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
434 892 : SerializeRelationMap(relmapperlen, relmapperspace);
435 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
436 : relmapperspace);
437 :
438 : /* Serialize uncommitted enum state. */
439 892 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
440 : uncommittedenumslen);
441 892 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
442 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
443 : uncommittedenumsspace);
444 :
445 : /* Serialize our ClientConnectionInfo. */
446 892 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
447 892 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
448 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
449 : clientconninfospace);
450 :
451 : /* Allocate space for worker information. */
452 892 : pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
453 :
454 : /*
455 : * Establish error queues in dynamic shared memory.
456 : *
457 : * These queues should be used only for transmitting ErrorResponse,
458 : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
459 : * should be transmitted via separate (possibly larger?) queues.
460 : */
461 : error_queue_space =
462 892 : shm_toc_allocate(pcxt->toc,
463 : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
464 892 : pcxt->nworkers));
465 2868 : for (i = 0; i < pcxt->nworkers; ++i)
466 : {
467 : char *start;
468 : shm_mq *mq;
469 :
470 1976 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
471 1976 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
472 1976 : shm_mq_set_receiver(mq, MyProc);
473 1976 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
474 : }
475 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
476 :
477 : /*
478 : * Serialize entrypoint information. It's unsafe to pass function
479 : * pointers across processes, as the function pointer may be different
480 : * in each process in EXEC_BACKEND builds, so we always pass library
481 : * and function name. (We use library name "postgres" for functions
482 : * in the core backend.)
483 : */
484 892 : lnamelen = strlen(pcxt->library_name);
485 892 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
486 892 : strlen(pcxt->function_name) + 2);
487 892 : strcpy(entrypointstate, pcxt->library_name);
488 892 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
489 892 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
490 : }
491 :
492 : /* Update nworkers_to_launch, in case we changed nworkers above. */
493 892 : pcxt->nworkers_to_launch = pcxt->nworkers;
494 :
495 : /* Restore previous memory context. */
496 892 : MemoryContextSwitchTo(oldcontext);
497 892 : }
498 :
499 : /*
500 : * Reinitialize the dynamic shared memory segment for a parallel context such
501 : * that we could launch workers for it again.
502 : */
503 : void
504 258 : ReinitializeParallelDSM(ParallelContext *pcxt)
505 : {
506 : FixedParallelState *fps;
507 :
508 : /* Wait for any old workers to exit. */
509 258 : if (pcxt->nworkers_launched > 0)
510 : {
511 258 : WaitForParallelWorkersToFinish(pcxt);
512 258 : WaitForParallelWorkersToExit(pcxt);
513 258 : pcxt->nworkers_launched = 0;
514 258 : if (pcxt->known_attached_workers)
515 : {
516 258 : pfree(pcxt->known_attached_workers);
517 258 : pcxt->known_attached_workers = NULL;
518 258 : pcxt->nknown_attached_workers = 0;
519 : }
520 : }
521 :
522 : /* Reset a few bits of fixed parallel state to a clean state. */
523 258 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
524 258 : fps->last_xlog_end = 0;
525 :
526 : /* Recreate error queues (if they exist). */
527 258 : if (pcxt->nworkers > 0)
528 : {
529 : char *error_queue_space;
530 : int i;
531 :
532 : error_queue_space =
533 258 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
534 1080 : for (i = 0; i < pcxt->nworkers; ++i)
535 : {
536 : char *start;
537 : shm_mq *mq;
538 :
539 822 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
540 822 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
541 822 : shm_mq_set_receiver(mq, MyProc);
542 822 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
543 : }
544 : }
545 258 : }
546 :
547 : /*
548 : * Reinitialize parallel workers for a parallel context such that we could
549 : * launch a different number of workers. This is required for cases where
550 : * we need to reuse the same DSM segment, but the number of workers can
551 : * vary from run-to-run.
552 : */
553 : void
554 22 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
555 : {
556 : /*
557 : * The number of workers that need to be launched must be less than the
558 : * number of workers with which the parallel context is initialized. But
559 : * the caller might not know that InitializeParallelDSM reduced nworkers,
560 : * so just silently trim the request.
561 : */
562 22 : pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
563 22 : }
564 :
565 : /*
566 : * Launch parallel workers.
567 : */
568 : void
569 1150 : LaunchParallelWorkers(ParallelContext *pcxt)
570 : {
571 : MemoryContext oldcontext;
572 : BackgroundWorker worker;
573 : int i;
574 1150 : bool any_registrations_failed = false;
575 :
576 : /* Skip this if we have no workers. */
577 1150 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
578 0 : return;
579 :
580 : /* We need to be a lock group leader. */
581 1150 : BecomeLockGroupLeader();
582 :
583 : /* If we do have workers, we'd better have a DSM segment. */
584 : Assert(pcxt->seg != NULL);
585 :
586 : /* We might be running in a short-lived memory context. */
587 1150 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
588 :
589 : /* Configure a worker. */
590 1150 : memset(&worker, 0, sizeof(worker));
591 1150 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
592 : MyProcPid);
593 1150 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
594 1150 : worker.bgw_flags =
595 : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
596 : | BGWORKER_CLASS_PARALLEL;
597 1150 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
598 1150 : worker.bgw_restart_time = BGW_NEVER_RESTART;
599 1150 : sprintf(worker.bgw_library_name, "postgres");
600 1150 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
601 1150 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
602 1150 : worker.bgw_notify_pid = MyProcPid;
603 :
604 : /*
605 : * Start workers.
606 : *
607 : * The caller must be able to tolerate ending up with fewer workers than
608 : * expected, so there is no need to throw an error here if registration
609 : * fails. It wouldn't help much anyway, because registering the worker in
610 : * no way guarantees that it will start up and initialize successfully.
611 : */
612 3948 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
613 : {
614 2798 : memcpy(worker.bgw_extra, &i, sizeof(int));
615 5538 : if (!any_registrations_failed &&
616 2740 : RegisterDynamicBackgroundWorker(&worker,
617 2740 : &pcxt->worker[i].bgwhandle))
618 : {
619 2712 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
620 2712 : pcxt->worker[i].bgwhandle);
621 2712 : pcxt->nworkers_launched++;
622 : }
623 : else
624 : {
625 : /*
626 : * If we weren't able to register the worker, then we've bumped up
627 : * against the max_worker_processes limit, and future
628 : * registrations will probably fail too, so arrange to skip them.
629 : * But we still have to execute this code for the remaining slots
630 : * to make sure that we forget about the error queues we budgeted
631 : * for those workers. Otherwise, we'll wait for them to start,
632 : * but they never will.
633 : */
634 86 : any_registrations_failed = true;
635 86 : pcxt->worker[i].bgwhandle = NULL;
636 86 : shm_mq_detach(pcxt->worker[i].error_mqh);
637 86 : pcxt->worker[i].error_mqh = NULL;
638 : }
639 : }
640 :
641 : /*
642 : * Now that nworkers_launched has taken its final value, we can initialize
643 : * known_attached_workers.
644 : */
645 1150 : if (pcxt->nworkers_launched > 0)
646 : {
647 1130 : pcxt->known_attached_workers =
648 1130 : palloc0(sizeof(bool) * pcxt->nworkers_launched);
649 1130 : pcxt->nknown_attached_workers = 0;
650 : }
651 :
652 : /* Restore previous memory context. */
653 1150 : MemoryContextSwitchTo(oldcontext);
654 : }
655 :
656 : /*
657 : * Wait for all workers to attach to their error queues, and throw an error if
658 : * any worker fails to do this.
659 : *
660 : * Callers can assume that if this function returns successfully, then the
661 : * number of workers given by pcxt->nworkers_launched have initialized and
662 : * attached to their error queues. Whether or not these workers are guaranteed
663 : * to still be running depends on what code the caller asked them to run;
664 : * this function does not guarantee that they have not exited. However, it
665 : * does guarantee that any workers which exited must have done so cleanly and
666 : * after successfully performing the work with which they were tasked.
667 : *
668 : * If this function is not called, then some of the workers that were launched
669 : * may not have been started due to a fork() failure, or may have exited during
670 : * early startup prior to attaching to the error queue, so nworkers_launched
671 : * cannot be viewed as completely reliable. It will never be less than the
672 : * number of workers which actually started, but it might be more. Any workers
673 : * that failed to start will still be discovered by
674 : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
675 : * provided that function is eventually reached.
676 : *
677 : * In general, the leader process should do as much work as possible before
678 : * calling this function. fork() failures and other early-startup failures
679 : * are very uncommon, and having the leader sit idle when it could be doing
680 : * useful work is undesirable. However, if the leader needs to wait for
681 : * all of its workers or for a specific worker, it may want to call this
682 : * function before doing so. If not, it must make some other provision for
683 : * the failure-to-start case, lest it wait forever. On the other hand, a
684 : * leader which never waits for a worker that might not be started yet, or
685 : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
686 : * call this function at all.
687 : */
688 : void
689 156 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
690 : {
691 : int i;
692 :
693 : /* Skip this if we have no launched workers. */
694 156 : if (pcxt->nworkers_launched == 0)
695 0 : return;
696 :
697 : for (;;)
698 : {
699 : /*
700 : * This will process any parallel messages that are pending and it may
701 : * also throw an error propagated from a worker.
702 : */
703 8911048 : CHECK_FOR_INTERRUPTS();
704 :
705 18166036 : for (i = 0; i < pcxt->nworkers_launched; ++i)
706 : {
707 : BgwHandleStatus status;
708 : shm_mq *mq;
709 : int rc;
710 : pid_t pid;
711 :
712 9254988 : if (pcxt->known_attached_workers[i])
713 343950 : continue;
714 :
715 : /*
716 : * If error_mqh is NULL, then the worker has already exited
717 : * cleanly.
718 : */
719 8911038 : if (pcxt->worker[i].error_mqh == NULL)
720 : {
721 0 : pcxt->known_attached_workers[i] = true;
722 0 : ++pcxt->nknown_attached_workers;
723 0 : continue;
724 : }
725 :
726 8911038 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
727 8911038 : if (status == BGWH_STARTED)
728 : {
729 : /* Has the worker attached to the error queue? */
730 8910888 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
731 8910888 : if (shm_mq_get_sender(mq) != NULL)
732 : {
733 : /* Yes, so it is known to be attached. */
734 140 : pcxt->known_attached_workers[i] = true;
735 140 : ++pcxt->nknown_attached_workers;
736 : }
737 : }
738 150 : else if (status == BGWH_STOPPED)
739 : {
740 : /*
741 : * If the worker stopped without attaching to the error queue,
742 : * throw an error.
743 : */
744 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
745 0 : if (shm_mq_get_sender(mq) == NULL)
746 0 : ereport(ERROR,
747 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
748 : errmsg("parallel worker failed to initialize"),
749 : errhint("More details may be available in the server log.")));
750 :
751 0 : pcxt->known_attached_workers[i] = true;
752 0 : ++pcxt->nknown_attached_workers;
753 : }
754 : else
755 : {
756 : /*
757 : * Worker not yet started, so we must wait. The postmaster
758 : * will notify us if the worker's state changes. Our latch
759 : * might also get set for some other reason, but if so we'll
760 : * just end up waiting for the same worker again.
761 : */
762 150 : rc = WaitLatch(MyLatch,
763 : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
764 : -1, WAIT_EVENT_BGWORKER_STARTUP);
765 :
766 150 : if (rc & WL_LATCH_SET)
767 150 : ResetLatch(MyLatch);
768 : }
769 : }
770 :
771 : /* If all workers are known to have started, we're done. */
772 8911048 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
773 : {
774 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
775 156 : break;
776 : }
777 : }
778 : }
779 :
780 : /*
781 : * Wait for all workers to finish computing.
782 : *
783 : * Even if the parallel operation seems to have completed successfully, it's
784 : * important to call this function afterwards. We must not miss any errors
785 : * the workers may have thrown during the parallel operation, or any that they
786 : * may yet throw while shutting down.
787 : *
788 : * Also, we want to update our notion of XactLastRecEnd based on worker
789 : * feedback.
790 : */
791 : void
792 2600 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
793 : {
794 : for (;;)
795 1204 : {
796 2600 : bool anyone_alive = false;
797 2600 : int nfinished = 0;
798 : int i;
799 :
800 : /*
801 : * This will process any parallel messages that are pending, which may
802 : * change the outcome of the loop that follows. It may also throw an
803 : * error propagated from a worker.
804 : */
805 2600 : CHECK_FOR_INTERRUPTS();
806 :
807 9140 : for (i = 0; i < pcxt->nworkers_launched; ++i)
808 : {
809 : /*
810 : * If error_mqh is NULL, then the worker has already exited
811 : * cleanly. If we have received a message through error_mqh from
812 : * the worker, we know it started up cleanly, and therefore we're
813 : * certain to be notified when it exits.
814 : */
815 6576 : if (pcxt->worker[i].error_mqh == NULL)
816 5302 : ++nfinished;
817 1274 : else if (pcxt->known_attached_workers[i])
818 : {
819 36 : anyone_alive = true;
820 36 : break;
821 : }
822 : }
823 :
824 2600 : if (!anyone_alive)
825 : {
826 : /* If all workers are known to have finished, we're done. */
827 2564 : if (nfinished >= pcxt->nworkers_launched)
828 : {
829 : Assert(nfinished == pcxt->nworkers_launched);
830 1396 : break;
831 : }
832 :
833 : /*
834 : * We didn't detect any living workers, but not all workers are
835 : * known to have exited cleanly. Either not all workers have
836 : * launched yet, or maybe some of them failed to start or
837 : * terminated abnormally.
838 : */
839 4192 : for (i = 0; i < pcxt->nworkers_launched; ++i)
840 : {
841 : pid_t pid;
842 : shm_mq *mq;
843 :
844 : /*
845 : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
846 : * should just keep waiting. If it is BGWH_STOPPED, then
847 : * further investigation is needed.
848 : */
849 3024 : if (pcxt->worker[i].error_mqh == NULL ||
850 2476 : pcxt->worker[i].bgwhandle == NULL ||
851 1238 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
852 : &pid) != BGWH_STOPPED)
853 3024 : continue;
854 :
855 : /*
856 : * Check whether the worker ended up stopped without ever
857 : * attaching to the error queue. If so, the postmaster was
858 : * unable to fork the worker or it exited without initializing
859 : * properly. We must throw an error, since the caller may
860 : * have been expecting the worker to do some work before
861 : * exiting.
862 : */
863 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
864 0 : if (shm_mq_get_sender(mq) == NULL)
865 0 : ereport(ERROR,
866 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
867 : errmsg("parallel worker failed to initialize"),
868 : errhint("More details may be available in the server log.")));
869 :
870 : /*
871 : * The worker is stopped, but is attached to the error queue.
872 : * Unless there's a bug somewhere, this will only happen when
873 : * the worker writes messages and terminates after the
874 : * CHECK_FOR_INTERRUPTS() near the top of this function and
875 : * before the call to GetBackgroundWorkerPid(). In that case,
876 : * or latch should have been set as well and the right things
877 : * will happen on the next pass through the loop.
878 : */
879 : }
880 : }
881 :
882 1204 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
883 : WAIT_EVENT_PARALLEL_FINISH);
884 1204 : ResetLatch(MyLatch);
885 : }
886 :
887 1396 : if (pcxt->toc != NULL)
888 : {
889 : FixedParallelState *fps;
890 :
891 1396 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
892 1396 : if (fps->last_xlog_end > XactLastRecEnd)
893 26 : XactLastRecEnd = fps->last_xlog_end;
894 : }
895 1396 : }
896 :
897 : /*
898 : * Wait for all workers to exit.
899 : *
900 : * This function ensures that workers have been completely shutdown. The
901 : * difference between WaitForParallelWorkersToFinish and this function is
902 : * that the former just ensures that last message sent by a worker backend is
903 : * received by the leader backend whereas this ensures the complete shutdown.
904 : */
905 : static void
906 1150 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
907 : {
908 : int i;
909 :
910 : /* Wait until the workers actually die. */
911 3862 : for (i = 0; i < pcxt->nworkers_launched; ++i)
912 : {
913 : BgwHandleStatus status;
914 :
915 2712 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
916 0 : continue;
917 :
918 2712 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
919 :
920 : /*
921 : * If the postmaster kicked the bucket, we have no chance of cleaning
922 : * up safely -- we won't be able to tell when our workers are actually
923 : * dead. This doesn't necessitate a PANIC since they will all abort
924 : * eventually, but we can't safely continue this session.
925 : */
926 2712 : if (status == BGWH_POSTMASTER_DIED)
927 0 : ereport(FATAL,
928 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
929 : errmsg("postmaster exited during a parallel transaction")));
930 :
931 : /* Release memory. */
932 2712 : pfree(pcxt->worker[i].bgwhandle);
933 2712 : pcxt->worker[i].bgwhandle = NULL;
934 : }
935 1150 : }
936 :
937 : /*
938 : * Destroy a parallel context.
939 : *
940 : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
941 : * first, before calling this function. When this function is invoked, any
942 : * remaining workers are forcibly killed; the dynamic shared memory segment
943 : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
944 : */
945 : void
946 892 : DestroyParallelContext(ParallelContext *pcxt)
947 : {
948 : int i;
949 :
950 : /*
951 : * Be careful about order of operations here! We remove the parallel
952 : * context from the list before we do anything else; otherwise, if an
953 : * error occurs during a subsequent step, we might try to nuke it again
954 : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
955 : */
956 892 : dlist_delete(&pcxt->node);
957 :
958 : /* Kill each worker in turn, and forget their error queues. */
959 892 : if (pcxt->worker != NULL)
960 : {
961 2788 : for (i = 0; i < pcxt->nworkers_launched; ++i)
962 : {
963 1896 : if (pcxt->worker[i].error_mqh != NULL)
964 : {
965 12 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
966 :
967 12 : shm_mq_detach(pcxt->worker[i].error_mqh);
968 12 : pcxt->worker[i].error_mqh = NULL;
969 : }
970 : }
971 : }
972 :
973 : /*
974 : * If we have allocated a shared memory segment, detach it. This will
975 : * implicitly detach the error queues, and any other shared memory queues,
976 : * stored there.
977 : */
978 892 : if (pcxt->seg != NULL)
979 : {
980 892 : dsm_detach(pcxt->seg);
981 892 : pcxt->seg = NULL;
982 : }
983 :
984 : /*
985 : * If this parallel context is actually in backend-private memory rather
986 : * than shared memory, free that memory instead.
987 : */
988 892 : if (pcxt->private_memory != NULL)
989 : {
990 0 : pfree(pcxt->private_memory);
991 0 : pcxt->private_memory = NULL;
992 : }
993 :
994 : /*
995 : * We can't finish transaction commit or abort until all of the workers
996 : * have exited. This means, in particular, that we can't respond to
997 : * interrupts at this stage.
998 : */
999 892 : HOLD_INTERRUPTS();
1000 892 : WaitForParallelWorkersToExit(pcxt);
1001 892 : RESUME_INTERRUPTS();
1002 :
1003 : /* Free the worker array itself. */
1004 892 : if (pcxt->worker != NULL)
1005 : {
1006 892 : pfree(pcxt->worker);
1007 892 : pcxt->worker = NULL;
1008 : }
1009 :
1010 : /* Free memory. */
1011 892 : pfree(pcxt->library_name);
1012 892 : pfree(pcxt->function_name);
1013 892 : pfree(pcxt);
1014 892 : }
1015 :
1016 : /*
1017 : * Are there any parallel contexts currently active?
1018 : */
1019 : bool
1020 0 : ParallelContextActive(void)
1021 : {
1022 0 : return !dlist_is_empty(&pcxt_list);
1023 : }
1024 :
1025 : /*
1026 : * Handle receipt of an interrupt indicating a parallel worker message.
1027 : *
1028 : * Note: this is called within a signal handler! All we can do is set
1029 : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1030 : * HandleParallelMessages().
1031 : */
1032 : void
1033 3522 : HandleParallelMessageInterrupt(void)
1034 : {
1035 3522 : InterruptPending = true;
1036 3522 : ParallelMessagePending = true;
1037 3522 : SetLatch(MyLatch);
1038 3522 : }
1039 :
1040 : /*
1041 : * Handle any queued protocol messages received from parallel workers.
1042 : */
1043 : void
1044 3462 : HandleParallelMessages(void)
1045 : {
1046 : dlist_iter iter;
1047 : MemoryContext oldcontext;
1048 :
1049 : static MemoryContext hpm_context = NULL;
1050 :
1051 : /*
1052 : * This is invoked from ProcessInterrupts(), and since some of the
1053 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1054 : * for recursive calls if more signals are received while this runs. It's
1055 : * unclear that recursive entry would be safe, and it doesn't seem useful
1056 : * even if it is safe, so let's block interrupts until done.
1057 : */
1058 3462 : HOLD_INTERRUPTS();
1059 :
1060 : /*
1061 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1062 : * don't want to risk leaking data into long-lived contexts, so let's do
1063 : * our work here in a private context that we can reset on each use.
1064 : */
1065 3462 : if (hpm_context == NULL) /* first time through? */
1066 138 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1067 : "HandleParallelMessages",
1068 : ALLOCSET_DEFAULT_SIZES);
1069 : else
1070 3324 : MemoryContextReset(hpm_context);
1071 :
1072 3462 : oldcontext = MemoryContextSwitchTo(hpm_context);
1073 :
1074 : /* OK to process messages. Reset the flag saying there are more to do. */
1075 3462 : ParallelMessagePending = false;
1076 :
1077 7080 : dlist_foreach(iter, &pcxt_list)
1078 : {
1079 : ParallelContext *pcxt;
1080 : int i;
1081 :
1082 3630 : pcxt = dlist_container(ParallelContext, node, iter.cur);
1083 3630 : if (pcxt->worker == NULL)
1084 0 : continue;
1085 :
1086 14970 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1087 : {
1088 : /*
1089 : * Read as many messages as we can from each worker, but stop when
1090 : * either (1) the worker's error queue goes away, which can happen
1091 : * if we receive a Terminate message from the worker; or (2) no
1092 : * more messages can be read from the worker without blocking.
1093 : */
1094 14052 : while (pcxt->worker[i].error_mqh != NULL)
1095 : {
1096 : shm_mq_result res;
1097 : Size nbytes;
1098 : void *data;
1099 :
1100 6274 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1101 : &data, true);
1102 6274 : if (res == SHM_MQ_WOULD_BLOCK)
1103 3562 : break;
1104 2712 : else if (res == SHM_MQ_SUCCESS)
1105 : {
1106 : StringInfoData msg;
1107 :
1108 2712 : initStringInfo(&msg);
1109 2712 : appendBinaryStringInfo(&msg, data, nbytes);
1110 2712 : HandleParallelMessage(pcxt, i, &msg);
1111 2700 : pfree(msg.data);
1112 : }
1113 : else
1114 0 : ereport(ERROR,
1115 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1116 : errmsg("lost connection to parallel worker")));
1117 : }
1118 : }
1119 : }
1120 :
1121 3450 : MemoryContextSwitchTo(oldcontext);
1122 :
1123 : /* Might as well clear the context on our way out */
1124 3450 : MemoryContextReset(hpm_context);
1125 :
1126 3450 : RESUME_INTERRUPTS();
1127 3450 : }
1128 :
1129 : /*
1130 : * Handle a single protocol message received from a single parallel worker.
1131 : */
1132 : static void
1133 2712 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1134 : {
1135 : char msgtype;
1136 :
1137 2712 : if (pcxt->known_attached_workers != NULL &&
1138 2712 : !pcxt->known_attached_workers[i])
1139 : {
1140 2572 : pcxt->known_attached_workers[i] = true;
1141 2572 : pcxt->nknown_attached_workers++;
1142 : }
1143 :
1144 2712 : msgtype = pq_getmsgbyte(msg);
1145 :
1146 2712 : switch (msgtype)
1147 : {
1148 12 : case PqMsg_ErrorResponse:
1149 : case PqMsg_NoticeResponse:
1150 : {
1151 : ErrorData edata;
1152 : ErrorContextCallback *save_error_context_stack;
1153 :
1154 : /* Parse ErrorResponse or NoticeResponse. */
1155 12 : pq_parse_errornotice(msg, &edata);
1156 :
1157 : /* Death of a worker isn't enough justification for suicide. */
1158 12 : edata.elevel = Min(edata.elevel, ERROR);
1159 :
1160 : /*
1161 : * If desired, add a context line to show that this is a
1162 : * message propagated from a parallel worker. Otherwise, it
1163 : * can sometimes be confusing to understand what actually
1164 : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1165 : * because it causes test-result instability depending on
1166 : * whether a parallel worker is actually used or not.)
1167 : */
1168 12 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1169 : {
1170 12 : if (edata.context)
1171 6 : edata.context = psprintf("%s\n%s", edata.context,
1172 : _("parallel worker"));
1173 : else
1174 6 : edata.context = pstrdup(_("parallel worker"));
1175 : }
1176 :
1177 : /*
1178 : * Context beyond that should use the error context callbacks
1179 : * that were in effect when the ParallelContext was created,
1180 : * not the current ones.
1181 : */
1182 12 : save_error_context_stack = error_context_stack;
1183 12 : error_context_stack = pcxt->error_context_stack;
1184 :
1185 : /* Rethrow error or print notice. */
1186 12 : ThrowErrorData(&edata);
1187 :
1188 : /* Not an error, so restore previous context stack. */
1189 0 : error_context_stack = save_error_context_stack;
1190 :
1191 0 : break;
1192 : }
1193 :
1194 0 : case PqMsg_NotificationResponse:
1195 : {
1196 : /* Propagate NotifyResponse. */
1197 : int32 pid;
1198 : const char *channel;
1199 : const char *payload;
1200 :
1201 0 : pid = pq_getmsgint(msg, 4);
1202 0 : channel = pq_getmsgrawstring(msg);
1203 0 : payload = pq_getmsgrawstring(msg);
1204 0 : pq_endmessage(msg);
1205 :
1206 0 : NotifyMyFrontEnd(channel, payload, pid);
1207 :
1208 0 : break;
1209 : }
1210 :
1211 0 : case 'P': /* Parallel progress reporting */
1212 : {
1213 : /*
1214 : * Only incremental progress reporting is currently supported.
1215 : * However, it's possible to add more fields to the message to
1216 : * allow for handling of other backend progress APIs.
1217 : */
1218 0 : int index = pq_getmsgint(msg, 4);
1219 0 : int64 incr = pq_getmsgint64(msg);
1220 :
1221 0 : pq_getmsgend(msg);
1222 :
1223 0 : pgstat_progress_incr_param(index, incr);
1224 :
1225 0 : break;
1226 : }
1227 :
1228 2700 : case PqMsg_Terminate:
1229 : {
1230 2700 : shm_mq_detach(pcxt->worker[i].error_mqh);
1231 2700 : pcxt->worker[i].error_mqh = NULL;
1232 2700 : break;
1233 : }
1234 :
1235 0 : default:
1236 : {
1237 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1238 : msgtype, msg->len);
1239 : }
1240 : }
1241 2700 : }
1242 :
1243 : /*
1244 : * End-of-subtransaction cleanup for parallel contexts.
1245 : *
1246 : * Here we remove only parallel contexts initiated within the current
1247 : * subtransaction.
1248 : */
1249 : void
1250 20040 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1251 : {
1252 20046 : while (!dlist_is_empty(&pcxt_list))
1253 : {
1254 : ParallelContext *pcxt;
1255 :
1256 6 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1257 6 : if (pcxt->subid != mySubId)
1258 0 : break;
1259 6 : if (isCommit)
1260 0 : elog(WARNING, "leaked parallel context");
1261 6 : DestroyParallelContext(pcxt);
1262 : }
1263 20040 : }
1264 :
1265 : /*
1266 : * End-of-transaction cleanup for parallel contexts.
1267 : *
1268 : * We nuke all remaining parallel contexts.
1269 : */
1270 : void
1271 784662 : AtEOXact_Parallel(bool isCommit)
1272 : {
1273 784668 : while (!dlist_is_empty(&pcxt_list))
1274 : {
1275 : ParallelContext *pcxt;
1276 :
1277 6 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1278 6 : if (isCommit)
1279 0 : elog(WARNING, "leaked parallel context");
1280 6 : DestroyParallelContext(pcxt);
1281 : }
1282 784662 : }
1283 :
1284 : /*
1285 : * Main entrypoint for parallel workers.
1286 : */
1287 : void
1288 2712 : ParallelWorkerMain(Datum main_arg)
1289 : {
1290 : dsm_segment *seg;
1291 : shm_toc *toc;
1292 : FixedParallelState *fps;
1293 : char *error_queue_space;
1294 : shm_mq *mq;
1295 : shm_mq_handle *mqh;
1296 : char *libraryspace;
1297 : char *entrypointstate;
1298 : char *library_name;
1299 : char *function_name;
1300 : parallel_worker_main_type entrypt;
1301 : char *gucspace;
1302 : char *combocidspace;
1303 : char *tsnapspace;
1304 : char *asnapspace;
1305 : char *tstatespace;
1306 : char *pendingsyncsspace;
1307 : char *reindexspace;
1308 : char *relmapperspace;
1309 : char *uncommittedenumsspace;
1310 : char *clientconninfospace;
1311 : char *session_dsm_handle_space;
1312 : Snapshot tsnapshot;
1313 : Snapshot asnapshot;
1314 :
1315 : /* Set flag to indicate that we're initializing a parallel worker. */
1316 2712 : InitializingParallelWorker = true;
1317 :
1318 : /* Establish signal handlers. */
1319 2712 : pqsignal(SIGTERM, die);
1320 2712 : BackgroundWorkerUnblockSignals();
1321 :
1322 : /* Determine and set our parallel worker number. */
1323 : Assert(ParallelWorkerNumber == -1);
1324 2712 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1325 :
1326 : /* Set up a memory context to work in, just for cleanliness. */
1327 2712 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1328 : "Parallel worker",
1329 : ALLOCSET_DEFAULT_SIZES);
1330 :
1331 : /*
1332 : * Attach to the dynamic shared memory segment for the parallel query, and
1333 : * find its table of contents.
1334 : *
1335 : * Note: at this point, we have not created any ResourceOwner in this
1336 : * process. This will result in our DSM mapping surviving until process
1337 : * exit, which is fine. If there were a ResourceOwner, it would acquire
1338 : * ownership of the mapping, but we have no need for that.
1339 : */
1340 2712 : seg = dsm_attach(DatumGetUInt32(main_arg));
1341 2712 : if (seg == NULL)
1342 0 : ereport(ERROR,
1343 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1344 : errmsg("could not map dynamic shared memory segment")));
1345 2712 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1346 2712 : if (toc == NULL)
1347 0 : ereport(ERROR,
1348 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1349 : errmsg("invalid magic number in dynamic shared memory segment")));
1350 :
1351 : /* Look up fixed parallel state. */
1352 2712 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1353 2712 : MyFixedParallelState = fps;
1354 :
1355 : /* Arrange to signal the leader if we exit. */
1356 2712 : ParallelLeaderPid = fps->parallel_leader_pid;
1357 2712 : ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1358 2712 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1359 :
1360 : /*
1361 : * Now we can find and attach to the error queue provided for us. That's
1362 : * good, because until we do that, any errors that happen here will not be
1363 : * reported back to the process that requested that this worker be
1364 : * launched.
1365 : */
1366 2712 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1367 2712 : mq = (shm_mq *) (error_queue_space +
1368 2712 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1369 2712 : shm_mq_set_sender(mq, MyProc);
1370 2712 : mqh = shm_mq_attach(mq, seg, NULL);
1371 2712 : pq_redirect_to_shm_mq(seg, mqh);
1372 2712 : pq_set_parallel_leader(fps->parallel_leader_pid,
1373 : fps->parallel_leader_proc_number);
1374 :
1375 : /*
1376 : * Hooray! Primary initialization is complete. Now, we need to set up our
1377 : * backend-local state to match the original backend.
1378 : */
1379 :
1380 : /*
1381 : * Join locking group. We must do this before anything that could try to
1382 : * acquire a heavyweight lock, because any heavyweight locks acquired to
1383 : * this point could block either directly against the parallel group
1384 : * leader or against some process which in turn waits for a lock that
1385 : * conflicts with the parallel group leader, causing an undetected
1386 : * deadlock. (If we can't join the lock group, the leader has gone away,
1387 : * so just exit quietly.)
1388 : */
1389 2712 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1390 : fps->parallel_leader_pid))
1391 0 : return;
1392 :
1393 : /*
1394 : * Restore transaction and statement start-time timestamps. This must
1395 : * happen before anything that would start a transaction, else asserts in
1396 : * xact.c will fire.
1397 : */
1398 2712 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1399 :
1400 : /*
1401 : * Identify the entry point to be called. In theory this could result in
1402 : * loading an additional library, though most likely the entry point is in
1403 : * the core backend or in a library we just loaded.
1404 : */
1405 2712 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1406 2712 : library_name = entrypointstate;
1407 2712 : function_name = entrypointstate + strlen(library_name) + 1;
1408 :
1409 2712 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
1410 :
1411 : /*
1412 : * Restore current session authorization and role id. No verification
1413 : * happens here, we just blindly adopt the leader's state. Note that this
1414 : * has to happen before InitPostgres, since InitializeSessionUserId will
1415 : * not set these variables.
1416 : */
1417 2712 : SetAuthenticatedUserId(fps->authenticated_user_id);
1418 2712 : SetSessionAuthorization(fps->session_user_id,
1419 2712 : fps->session_user_is_superuser);
1420 2712 : SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
1421 :
1422 : /*
1423 : * Restore database connection. We skip connection authorization checks,
1424 : * reasoning that (a) the leader checked these things when it started, and
1425 : * (b) we do not want parallel mode to cause these failures, because that
1426 : * would make use of parallel query plans not transparent to applications.
1427 : */
1428 2712 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1429 : fps->authenticated_user_id,
1430 : BGWORKER_BYPASS_ALLOWCONN |
1431 : BGWORKER_BYPASS_ROLELOGINCHECK);
1432 :
1433 : /*
1434 : * Set the client encoding to the database encoding, since that is what
1435 : * the leader will expect. (We're cheating a bit by not calling
1436 : * PrepareClientEncoding first. It's okay because this call will always
1437 : * result in installing a no-op conversion. No error should be possible,
1438 : * but check anyway.)
1439 : */
1440 2712 : if (SetClientEncoding(GetDatabaseEncoding()) < 0)
1441 0 : elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
1442 :
1443 : /*
1444 : * Load libraries that were loaded by original backend. We want to do
1445 : * this before restoring GUCs, because the libraries might define custom
1446 : * variables.
1447 : */
1448 2712 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1449 2712 : StartTransactionCommand();
1450 2712 : RestoreLibraryState(libraryspace);
1451 2712 : CommitTransactionCommand();
1452 :
1453 : /* Crank up a transaction state appropriate to a parallel worker. */
1454 2712 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1455 2712 : StartParallelWorkerTransaction(tstatespace);
1456 :
1457 : /*
1458 : * Restore state that affects catalog access. Ideally we'd do this even
1459 : * before calling InitPostgres, but that has order-of-initialization
1460 : * problems, and also the relmapper would get confused during the
1461 : * CommitTransactionCommand call above.
1462 : */
1463 2712 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1464 : false);
1465 2712 : RestorePendingSyncs(pendingsyncsspace);
1466 2712 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1467 2712 : RestoreRelationMap(relmapperspace);
1468 2712 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1469 2712 : RestoreReindexState(reindexspace);
1470 2712 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1471 2712 : RestoreComboCIDState(combocidspace);
1472 :
1473 : /* Attach to the per-session DSM segment and contained objects. */
1474 : session_dsm_handle_space =
1475 2712 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1476 2712 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1477 :
1478 : /*
1479 : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1480 : * the leader has serialized the transaction snapshot and we must restore
1481 : * it. At lower isolation levels, there is no transaction-lifetime
1482 : * snapshot, but we need TransactionXmin to get set to a value which is
1483 : * less than or equal to the xmin of every snapshot that will be used by
1484 : * this worker. The easiest way to accomplish that is to install the
1485 : * active snapshot as the transaction snapshot. Code running in this
1486 : * parallel worker might take new snapshots via GetTransactionSnapshot()
1487 : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1488 : * snapshot older than the active snapshot.
1489 : */
1490 2712 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1491 2712 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1492 2712 : asnapshot = RestoreSnapshot(asnapspace);
1493 2712 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1494 2712 : RestoreTransactionSnapshot(tsnapshot,
1495 2712 : fps->parallel_leader_pgproc);
1496 2712 : PushActiveSnapshot(asnapshot);
1497 :
1498 : /*
1499 : * We've changed which tuples we can see, and must therefore invalidate
1500 : * system caches.
1501 : */
1502 2712 : InvalidateSystemCaches();
1503 :
1504 : /*
1505 : * Restore GUC values from launching backend. We can't do this earlier,
1506 : * because GUC check hooks that do catalog lookups need to see the same
1507 : * database state as the leader. Also, the check hooks for
1508 : * session_authorization and role assume we already set the correct role
1509 : * OIDs.
1510 : */
1511 2712 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1512 2712 : RestoreGUCState(gucspace);
1513 :
1514 : /*
1515 : * Restore current user ID and security context. No verification happens
1516 : * here, we just blindly adopt the leader's state. We can't do this till
1517 : * after restoring GUCs, else we'll get complaints about restoring
1518 : * session_authorization and role. (In effect, we're assuming that all
1519 : * the restored values are okay to set, even if we are now inside a
1520 : * restricted context.)
1521 : */
1522 2712 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1523 :
1524 : /* Restore temp-namespace state to ensure search path matches leader's. */
1525 2712 : SetTempNamespaceState(fps->temp_namespace_id,
1526 : fps->temp_toast_namespace_id);
1527 :
1528 : /* Restore uncommitted enums. */
1529 2712 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1530 : false);
1531 2712 : RestoreUncommittedEnums(uncommittedenumsspace);
1532 :
1533 : /* Restore the ClientConnectionInfo. */
1534 2712 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1535 : false);
1536 2712 : RestoreClientConnectionInfo(clientconninfospace);
1537 :
1538 : /*
1539 : * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1540 : * ensure that auth_method is actually valid, aka authn_id is not NULL.
1541 : */
1542 2712 : if (MyClientConnectionInfo.authn_id)
1543 4 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1544 : hba_authname(MyClientConnectionInfo.auth_method));
1545 :
1546 : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
1547 2712 : AttachSerializableXact(fps->serializable_xact_handle);
1548 :
1549 : /*
1550 : * We've initialized all of our state now; nothing should change
1551 : * hereafter.
1552 : */
1553 2712 : InitializingParallelWorker = false;
1554 2712 : EnterParallelMode();
1555 :
1556 : /*
1557 : * Time to do the real work: invoke the caller-supplied code.
1558 : */
1559 2712 : entrypt(seg, toc);
1560 :
1561 : /* Must exit parallel mode to pop active snapshot. */
1562 2700 : ExitParallelMode();
1563 :
1564 : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1565 2700 : PopActiveSnapshot();
1566 :
1567 : /* Shut down the parallel-worker transaction. */
1568 2700 : EndParallelWorkerTransaction();
1569 :
1570 : /* Detach from the per-session DSM segment. */
1571 2700 : DetachSession();
1572 :
1573 : /* Report success. */
1574 2700 : pq_putmessage(PqMsg_Terminate, NULL, 0);
1575 : }
1576 :
1577 : /*
1578 : * Update shared memory with the ending location of the last WAL record we
1579 : * wrote, if it's greater than the value already stored there.
1580 : */
1581 : void
1582 2700 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1583 : {
1584 2700 : FixedParallelState *fps = MyFixedParallelState;
1585 :
1586 : Assert(fps != NULL);
1587 2700 : SpinLockAcquire(&fps->mutex);
1588 2700 : if (fps->last_xlog_end < last_xlog_end)
1589 134 : fps->last_xlog_end = last_xlog_end;
1590 2700 : SpinLockRelease(&fps->mutex);
1591 2700 : }
1592 :
1593 : /*
1594 : * Make sure the leader tries to read from our error queue one more time.
1595 : * This guards against the case where we exit uncleanly without sending an
1596 : * ErrorResponse to the leader, for example because some code calls proc_exit
1597 : * directly.
1598 : *
1599 : * Also explicitly detach from dsm segment so that subsystems using
1600 : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1601 : * shut down as part of a before_shmem_exit() hook.
1602 : *
1603 : * One might think this could instead be solved by carefully ordering the
1604 : * attaching to dsm segments, so that the pgstats segments get detached from
1605 : * later than the parallel query one. That turns out to not work because the
1606 : * stats hash might need to grow which can cause new segments to be allocated,
1607 : * which then will be detached from earlier.
1608 : */
1609 : static void
1610 2712 : ParallelWorkerShutdown(int code, Datum arg)
1611 : {
1612 2712 : SendProcSignal(ParallelLeaderPid,
1613 : PROCSIG_PARALLEL_MESSAGE,
1614 : ParallelLeaderProcNumber);
1615 :
1616 2712 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
1617 2712 : }
1618 :
1619 : /*
1620 : * Look up (and possibly load) a parallel worker entry point function.
1621 : *
1622 : * For functions contained in the core code, we use library name "postgres"
1623 : * and consult the InternalParallelWorkers array. External functions are
1624 : * looked up, and loaded if necessary, using load_external_function().
1625 : *
1626 : * The point of this is to pass function names as strings across process
1627 : * boundaries. We can't pass actual function addresses because of the
1628 : * possibility that the function has been loaded at a different address
1629 : * in a different process. This is obviously a hazard for functions in
1630 : * loadable libraries, but it can happen even for functions in the core code
1631 : * on platforms using EXEC_BACKEND (e.g., Windows).
1632 : *
1633 : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1634 : * in favor of applying load_external_function() for core functions too;
1635 : * but that raises portability issues that are not worth addressing now.
1636 : */
1637 : static parallel_worker_main_type
1638 2712 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1639 : {
1640 : /*
1641 : * If the function is to be loaded from postgres itself, search the
1642 : * InternalParallelWorkers array.
1643 : */
1644 2712 : if (strcmp(libraryname, "postgres") == 0)
1645 : {
1646 : int i;
1647 :
1648 2986 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1649 : {
1650 2986 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1651 2712 : return InternalParallelWorkers[i].fn_addr;
1652 : }
1653 :
1654 : /* We can only reach this by programming error. */
1655 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1656 : }
1657 :
1658 : /* Otherwise load from external library. */
1659 0 : return (parallel_worker_main_type)
1660 0 : load_external_function(libraryname, funcname, true, NULL);
1661 : }
|