LCOV - code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 90.3 % 484 437
Test Date: 2026-03-09 12:15:00 Functions: 94.7 % 19 18
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * parallel.c
       4              :  *    Infrastructure for launching parallel workers
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/access/transam/parallel.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/brin.h"
      18              : #include "access/gin.h"
      19              : #include "access/nbtree.h"
      20              : #include "access/parallel.h"
      21              : #include "access/session.h"
      22              : #include "access/xact.h"
      23              : #include "access/xlog.h"
      24              : #include "catalog/index.h"
      25              : #include "catalog/namespace.h"
      26              : #include "catalog/pg_enum.h"
      27              : #include "catalog/storage.h"
      28              : #include "commands/async.h"
      29              : #include "commands/vacuum.h"
      30              : #include "executor/execParallel.h"
      31              : #include "libpq/libpq.h"
      32              : #include "libpq/pqformat.h"
      33              : #include "libpq/pqmq.h"
      34              : #include "miscadmin.h"
      35              : #include "optimizer/optimizer.h"
      36              : #include "pgstat.h"
      37              : #include "storage/ipc.h"
      38              : #include "storage/predicate.h"
      39              : #include "storage/proc.h"
      40              : #include "storage/spin.h"
      41              : #include "tcop/tcopprot.h"
      42              : #include "utils/combocid.h"
      43              : #include "utils/guc.h"
      44              : #include "utils/inval.h"
      45              : #include "utils/memutils.h"
      46              : #include "utils/relmapper.h"
      47              : #include "utils/snapmgr.h"
      48              : #include "utils/wait_event.h"
      49              : 
      50              : /*
      51              :  * We don't want to waste a lot of memory on an error queue which, most of
      52              :  * the time, will process only a handful of small messages.  However, it is
      53              :  * desirable to make it large enough that a typical ErrorResponse can be sent
      54              :  * without blocking.  That way, a worker that errors out can write the whole
      55              :  * message into the queue and terminate without waiting for the user backend.
      56              :  */
      57              : #define PARALLEL_ERROR_QUEUE_SIZE           16384
      58              : 
      59              : /* Magic number for parallel context TOC. */
      60              : #define PARALLEL_MAGIC                      0x50477c7c
      61              : 
      62              : /*
      63              :  * Magic numbers for per-context parallel state sharing.  Higher-level code
      64              :  * should use smaller values, leaving these very large ones for use by this
      65              :  * module.
      66              :  */
      67              : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
      68              : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
      69              : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
      70              : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
      71              : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
      72              : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
      73              : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
      74              : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
      75              : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
      76              : #define PARALLEL_KEY_SESSION_DSM            UINT64CONST(0xFFFFFFFFFFFF000A)
      77              : #define PARALLEL_KEY_PENDING_SYNCS          UINT64CONST(0xFFFFFFFFFFFF000B)
      78              : #define PARALLEL_KEY_REINDEX_STATE          UINT64CONST(0xFFFFFFFFFFFF000C)
      79              : #define PARALLEL_KEY_RELMAPPER_STATE        UINT64CONST(0xFFFFFFFFFFFF000D)
      80              : #define PARALLEL_KEY_UNCOMMITTEDENUMS       UINT64CONST(0xFFFFFFFFFFFF000E)
      81              : #define PARALLEL_KEY_CLIENTCONNINFO         UINT64CONST(0xFFFFFFFFFFFF000F)
      82              : 
      83              : /* Fixed-size parallel state. */
      84              : typedef struct FixedParallelState
      85              : {
      86              :     /* Fixed-size state that workers must restore. */
      87              :     Oid         database_id;
      88              :     Oid         authenticated_user_id;
      89              :     Oid         session_user_id;
      90              :     Oid         outer_user_id;
      91              :     Oid         current_user_id;
      92              :     Oid         temp_namespace_id;
      93              :     Oid         temp_toast_namespace_id;
      94              :     int         sec_context;
      95              :     bool        session_user_is_superuser;
      96              :     bool        role_is_superuser;
      97              :     PGPROC     *parallel_leader_pgproc;
      98              :     pid_t       parallel_leader_pid;
      99              :     ProcNumber  parallel_leader_proc_number;
     100              :     TimestampTz xact_ts;
     101              :     TimestampTz stmt_ts;
     102              :     SerializableXactHandle serializable_xact_handle;
     103              : 
     104              :     /* Mutex protects remaining fields. */
     105              :     slock_t     mutex;
     106              : 
     107              :     /* Maximum XactLastRecEnd of any worker. */
     108              :     XLogRecPtr  last_xlog_end;
     109              : } FixedParallelState;
     110              : 
     111              : /*
     112              :  * Our parallel worker number.  We initialize this to -1, meaning that we are
     113              :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
     114              :  * and < the number of workers before any user code is invoked; each parallel
     115              :  * worker will get a different parallel worker number.
     116              :  */
     117              : int         ParallelWorkerNumber = -1;
     118              : 
     119              : /* Is there a parallel message pending which we need to receive? */
     120              : volatile sig_atomic_t ParallelMessagePending = false;
     121              : 
     122              : /* Are we initializing a parallel worker? */
     123              : bool        InitializingParallelWorker = false;
     124              : 
     125              : /* Pointer to our fixed parallel state. */
     126              : static FixedParallelState *MyFixedParallelState;
     127              : 
     128              : /* List of active parallel contexts. */
     129              : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
     130              : 
     131              : /* Backend-local copy of data from FixedParallelState. */
     132              : static pid_t ParallelLeaderPid;
     133              : 
     134              : /*
     135              :  * List of internal parallel worker entry points.  We need this for
     136              :  * reasons explained in LookupParallelWorkerFunction(), below.
     137              :  */
     138              : static const struct
     139              : {
     140              :     const char *fn_name;
     141              :     parallel_worker_main_type fn_addr;
     142              : }           InternalParallelWorkers[] =
     143              : 
     144              : {
     145              :     {
     146              :         "ParallelQueryMain", ParallelQueryMain
     147              :     },
     148              :     {
     149              :         "_bt_parallel_build_main", _bt_parallel_build_main
     150              :     },
     151              :     {
     152              :         "_brin_parallel_build_main", _brin_parallel_build_main
     153              :     },
     154              :     {
     155              :         "_gin_parallel_build_main", _gin_parallel_build_main
     156              :     },
     157              :     {
     158              :         "parallel_vacuum_main", parallel_vacuum_main
     159              :     }
     160              : };
     161              : 
     162              : /* Private functions. */
     163              : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
     164              : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
     165              : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
     166              : static void ParallelWorkerShutdown(int code, Datum arg);
     167              : 
     168              : 
     169              : /*
     170              :  * Establish a new parallel context.  This should be done after entering
     171              :  * parallel mode, and (unless there is an error) the context should be
     172              :  * destroyed before exiting the current subtransaction.
     173              :  */
     174              : ParallelContext *
     175          498 : CreateParallelContext(const char *library_name, const char *function_name,
     176              :                       int nworkers)
     177              : {
     178              :     MemoryContext oldcontext;
     179              :     ParallelContext *pcxt;
     180              : 
     181              :     /* It is unsafe to create a parallel context if not in parallel mode. */
     182              :     Assert(IsInParallelMode());
     183              : 
     184              :     /* Number of workers should be non-negative. */
     185              :     Assert(nworkers >= 0);
     186              : 
     187              :     /* We might be running in a short-lived memory context. */
     188          498 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     189              : 
     190              :     /* Initialize a new ParallelContext. */
     191          498 :     pcxt = palloc0_object(ParallelContext);
     192          498 :     pcxt->subid = GetCurrentSubTransactionId();
     193          498 :     pcxt->nworkers = nworkers;
     194          498 :     pcxt->nworkers_to_launch = nworkers;
     195          498 :     pcxt->library_name = pstrdup(library_name);
     196          498 :     pcxt->function_name = pstrdup(function_name);
     197          498 :     pcxt->error_context_stack = error_context_stack;
     198          498 :     shm_toc_initialize_estimator(&pcxt->estimator);
     199          498 :     dlist_push_head(&pcxt_list, &pcxt->node);
     200              : 
     201              :     /* Restore previous memory context. */
     202          498 :     MemoryContextSwitchTo(oldcontext);
     203              : 
     204          498 :     return pcxt;
     205              : }
     206              : 
     207              : /*
     208              :  * Establish the dynamic shared memory segment for a parallel context and
     209              :  * copy state and other bookkeeping information that will be needed by
     210              :  * parallel workers into it.
     211              :  */
     212              : void
     213          498 : InitializeParallelDSM(ParallelContext *pcxt)
     214              : {
     215              :     MemoryContext oldcontext;
     216          498 :     Size        library_len = 0;
     217          498 :     Size        guc_len = 0;
     218          498 :     Size        combocidlen = 0;
     219          498 :     Size        tsnaplen = 0;
     220          498 :     Size        asnaplen = 0;
     221          498 :     Size        tstatelen = 0;
     222          498 :     Size        pendingsyncslen = 0;
     223          498 :     Size        reindexlen = 0;
     224          498 :     Size        relmapperlen = 0;
     225          498 :     Size        uncommittedenumslen = 0;
     226          498 :     Size        clientconninfolen = 0;
     227          498 :     Size        segsize = 0;
     228              :     int         i;
     229              :     FixedParallelState *fps;
     230          498 :     dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
     231          498 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
     232          498 :     Snapshot    active_snapshot = GetActiveSnapshot();
     233              : 
     234              :     /* We might be running in a very short-lived memory context. */
     235          498 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     236              : 
     237              :     /* Allow space to store the fixed-size parallel state. */
     238          498 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
     239          498 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     240              : 
     241              :     /*
     242              :      * If we manage to reach here while non-interruptible, it's unsafe to
     243              :      * launch any workers: we would fail to process interrupts sent by them.
     244              :      * We can deal with that edge case by pretending no workers were
     245              :      * requested.
     246              :      */
     247          498 :     if (!INTERRUPTS_CAN_BE_PROCESSED())
     248            0 :         pcxt->nworkers = 0;
     249              : 
     250              :     /*
     251              :      * Normally, the user will have requested at least one worker process, but
     252              :      * if by chance they have not, we can skip a bunch of things here.
     253              :      */
     254          498 :     if (pcxt->nworkers > 0)
     255              :     {
     256              :         /* Get (or create) the per-session DSM segment's handle. */
     257          498 :         session_dsm_handle = GetSessionDsmHandle();
     258              : 
     259              :         /*
     260              :          * If we weren't able to create a per-session DSM segment, then we can
     261              :          * continue but we can't safely launch any workers because their
     262              :          * record typmods would be incompatible so they couldn't exchange
     263              :          * tuples.
     264              :          */
     265          498 :         if (session_dsm_handle == DSM_HANDLE_INVALID)
     266            0 :             pcxt->nworkers = 0;
     267              :     }
     268              : 
     269          498 :     if (pcxt->nworkers > 0)
     270              :     {
     271              :         StaticAssertDecl(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
     272              :                          PARALLEL_ERROR_QUEUE_SIZE,
     273              :                          "parallel error queue size not buffer-aligned");
     274              : 
     275              :         /* Estimate space for various kinds of state sharing. */
     276          498 :         library_len = EstimateLibraryStateSpace();
     277          498 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
     278          498 :         guc_len = EstimateGUCStateSpace();
     279          498 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
     280          498 :         combocidlen = EstimateComboCIDStateSpace();
     281          498 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
     282          498 :         if (IsolationUsesXactSnapshot())
     283              :         {
     284           11 :             tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
     285           11 :             shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
     286              :         }
     287          498 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
     288          498 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
     289          498 :         tstatelen = EstimateTransactionStateSpace();
     290          498 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
     291          498 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
     292          498 :         pendingsyncslen = EstimatePendingSyncsSpace();
     293          498 :         shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
     294          498 :         reindexlen = EstimateReindexStateSpace();
     295          498 :         shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
     296          498 :         relmapperlen = EstimateRelationMapSpace();
     297          498 :         shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
     298          498 :         uncommittedenumslen = EstimateUncommittedEnumsSpace();
     299          498 :         shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
     300          498 :         clientconninfolen = EstimateClientConnectionInfoSpace();
     301          498 :         shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
     302              :         /* If you add more chunks here, you probably need to add keys. */
     303          498 :         shm_toc_estimate_keys(&pcxt->estimator, 12);
     304              : 
     305              :         /* Estimate space need for error queues. */
     306          498 :         shm_toc_estimate_chunk(&pcxt->estimator,
     307              :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     308              :                                         pcxt->nworkers));
     309          498 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     310              : 
     311              :         /* Estimate how much we'll need for the entrypoint info. */
     312          498 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
     313              :                                strlen(pcxt->function_name) + 2);
     314          498 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     315              :     }
     316              : 
     317              :     /*
     318              :      * Create DSM and initialize with new table of contents.  But if the user
     319              :      * didn't request any workers, then don't bother creating a dynamic shared
     320              :      * memory segment; instead, just use backend-private memory.
     321              :      *
     322              :      * Also, if we can't create a dynamic shared memory segment because the
     323              :      * maximum number of segments have already been created, then fall back to
     324              :      * backend-private memory, and plan not to use any workers.  We hope this
     325              :      * won't happen very often, but it's better to abandon the use of
     326              :      * parallelism than to fail outright.
     327              :      */
     328          498 :     segsize = shm_toc_estimate(&pcxt->estimator);
     329          498 :     if (pcxt->nworkers > 0)
     330          498 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
     331          498 :     if (pcxt->seg != NULL)
     332          498 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
     333              :                                    dsm_segment_address(pcxt->seg),
     334              :                                    segsize);
     335              :     else
     336              :     {
     337            0 :         pcxt->nworkers = 0;
     338            0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
     339            0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
     340              :                                    segsize);
     341              :     }
     342              : 
     343              :     /* Initialize fixed-size state in shared memory. */
     344              :     fps = (FixedParallelState *)
     345          498 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
     346          498 :     fps->database_id = MyDatabaseId;
     347          498 :     fps->authenticated_user_id = GetAuthenticatedUserId();
     348          498 :     fps->session_user_id = GetSessionUserId();
     349          498 :     fps->outer_user_id = GetCurrentRoleId();
     350          498 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
     351          498 :     fps->session_user_is_superuser = GetSessionUserIsSuperuser();
     352          498 :     fps->role_is_superuser = current_role_is_superuser;
     353          498 :     GetTempNamespaceState(&fps->temp_namespace_id,
     354              :                           &fps->temp_toast_namespace_id);
     355          498 :     fps->parallel_leader_pgproc = MyProc;
     356          498 :     fps->parallel_leader_pid = MyProcPid;
     357          498 :     fps->parallel_leader_proc_number = MyProcNumber;
     358          498 :     fps->xact_ts = GetCurrentTransactionStartTimestamp();
     359          498 :     fps->stmt_ts = GetCurrentStatementStartTimestamp();
     360          498 :     fps->serializable_xact_handle = ShareSerializableXact();
     361          498 :     SpinLockInit(&fps->mutex);
     362          498 :     fps->last_xlog_end = InvalidXLogRecPtr;
     363          498 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
     364              : 
     365              :     /* We can skip the rest of this if we're not budgeting for any workers. */
     366          498 :     if (pcxt->nworkers > 0)
     367              :     {
     368              :         char       *libraryspace;
     369              :         char       *gucspace;
     370              :         char       *combocidspace;
     371              :         char       *tsnapspace;
     372              :         char       *asnapspace;
     373              :         char       *tstatespace;
     374              :         char       *pendingsyncsspace;
     375              :         char       *reindexspace;
     376              :         char       *relmapperspace;
     377              :         char       *error_queue_space;
     378              :         char       *session_dsm_handle_space;
     379              :         char       *entrypointstate;
     380              :         char       *uncommittedenumsspace;
     381              :         char       *clientconninfospace;
     382              :         Size        lnamelen;
     383              : 
     384              :         /* Serialize shared libraries we have loaded. */
     385          498 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
     386          498 :         SerializeLibraryState(library_len, libraryspace);
     387          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
     388              : 
     389              :         /* Serialize GUC settings. */
     390          498 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
     391          498 :         SerializeGUCState(guc_len, gucspace);
     392          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
     393              : 
     394              :         /* Serialize combo CID state. */
     395          498 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
     396          498 :         SerializeComboCIDState(combocidlen, combocidspace);
     397          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
     398              : 
     399              :         /*
     400              :          * Serialize the transaction snapshot if the transaction isolation
     401              :          * level uses a transaction snapshot.
     402              :          */
     403          498 :         if (IsolationUsesXactSnapshot())
     404              :         {
     405           11 :             tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
     406           11 :             SerializeSnapshot(transaction_snapshot, tsnapspace);
     407           11 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
     408              :                            tsnapspace);
     409              :         }
     410              : 
     411              :         /* Serialize the active snapshot. */
     412          498 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
     413          498 :         SerializeSnapshot(active_snapshot, asnapspace);
     414          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
     415              : 
     416              :         /* Provide the handle for per-session segment. */
     417          498 :         session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
     418              :                                                     sizeof(dsm_handle));
     419          498 :         *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
     420          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
     421              :                        session_dsm_handle_space);
     422              : 
     423              :         /* Serialize transaction state. */
     424          498 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
     425          498 :         SerializeTransactionState(tstatelen, tstatespace);
     426          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
     427              : 
     428              :         /* Serialize pending syncs. */
     429          498 :         pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
     430          498 :         SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
     431          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
     432              :                        pendingsyncsspace);
     433              : 
     434              :         /* Serialize reindex state. */
     435          498 :         reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
     436          498 :         SerializeReindexState(reindexlen, reindexspace);
     437          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
     438              : 
     439              :         /* Serialize relmapper state. */
     440          498 :         relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
     441          498 :         SerializeRelationMap(relmapperlen, relmapperspace);
     442          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
     443              :                        relmapperspace);
     444              : 
     445              :         /* Serialize uncommitted enum state. */
     446          498 :         uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
     447              :                                                  uncommittedenumslen);
     448          498 :         SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
     449          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
     450              :                        uncommittedenumsspace);
     451              : 
     452              :         /* Serialize our ClientConnectionInfo. */
     453          498 :         clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
     454          498 :         SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
     455          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
     456              :                        clientconninfospace);
     457              : 
     458              :         /* Allocate space for worker information. */
     459          498 :         pcxt->worker = palloc0_array(ParallelWorkerInfo, pcxt->nworkers);
     460              : 
     461              :         /*
     462              :          * Establish error queues in dynamic shared memory.
     463              :          *
     464              :          * These queues should be used only for transmitting ErrorResponse,
     465              :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
     466              :          * should be transmitted via separate (possibly larger?) queues.
     467              :          */
     468              :         error_queue_space =
     469          498 :             shm_toc_allocate(pcxt->toc,
     470              :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     471          498 :                                       pcxt->nworkers));
     472         1611 :         for (i = 0; i < pcxt->nworkers; ++i)
     473              :         {
     474              :             char       *start;
     475              :             shm_mq     *mq;
     476              : 
     477         1113 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     478         1113 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     479         1113 :             shm_mq_set_receiver(mq, MyProc);
     480         1113 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     481              :         }
     482          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
     483              : 
     484              :         /*
     485              :          * Serialize entrypoint information.  It's unsafe to pass function
     486              :          * pointers across processes, as the function pointer may be different
     487              :          * in each process in EXEC_BACKEND builds, so we always pass library
     488              :          * and function name.  (We use library name "postgres" for functions
     489              :          * in the core backend.)
     490              :          */
     491          498 :         lnamelen = strlen(pcxt->library_name);
     492          498 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
     493          498 :                                            strlen(pcxt->function_name) + 2);
     494          498 :         strcpy(entrypointstate, pcxt->library_name);
     495          498 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
     496          498 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
     497              :     }
     498              : 
     499              :     /* Update nworkers_to_launch, in case we changed nworkers above. */
     500          498 :     pcxt->nworkers_to_launch = pcxt->nworkers;
     501              : 
     502              :     /* Restore previous memory context. */
     503          498 :     MemoryContextSwitchTo(oldcontext);
     504          498 : }
     505              : 
     506              : /*
     507              :  * Reinitialize the dynamic shared memory segment for a parallel context such
     508              :  * that we could launch workers for it again.
     509              :  */
     510              : void
     511          143 : ReinitializeParallelDSM(ParallelContext *pcxt)
     512              : {
     513              :     MemoryContext oldcontext;
     514              :     FixedParallelState *fps;
     515              : 
     516              :     /* We might be running in a very short-lived memory context. */
     517          143 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     518              : 
     519              :     /* Wait for any old workers to exit. */
     520          143 :     if (pcxt->nworkers_launched > 0)
     521              :     {
     522          143 :         WaitForParallelWorkersToFinish(pcxt);
     523          143 :         WaitForParallelWorkersToExit(pcxt);
     524          143 :         pcxt->nworkers_launched = 0;
     525          143 :         if (pcxt->known_attached_workers)
     526              :         {
     527          143 :             pfree(pcxt->known_attached_workers);
     528          143 :             pcxt->known_attached_workers = NULL;
     529          143 :             pcxt->nknown_attached_workers = 0;
     530              :         }
     531              :     }
     532              : 
     533              :     /* Reset a few bits of fixed parallel state to a clean state. */
     534          143 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     535          143 :     fps->last_xlog_end = InvalidXLogRecPtr;
     536              : 
     537              :     /* Recreate error queues (if they exist). */
     538          143 :     if (pcxt->nworkers > 0)
     539              :     {
     540              :         char       *error_queue_space;
     541              :         int         i;
     542              : 
     543              :         error_queue_space =
     544          143 :             shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
     545          570 :         for (i = 0; i < pcxt->nworkers; ++i)
     546              :         {
     547              :             char       *start;
     548              :             shm_mq     *mq;
     549              : 
     550          427 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     551          427 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     552          427 :             shm_mq_set_receiver(mq, MyProc);
     553          427 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     554              :         }
     555              :     }
     556              : 
     557              :     /* Restore previous memory context. */
     558          143 :     MemoryContextSwitchTo(oldcontext);
     559          143 : }
     560              : 
     561              : /*
     562              :  * Reinitialize parallel workers for a parallel context such that we could
     563              :  * launch a different number of workers.  This is required for cases where
     564              :  * we need to reuse the same DSM segment, but the number of workers can
     565              :  * vary from run-to-run.
     566              :  */
     567              : void
     568           31 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
     569              : {
     570              :     /*
     571              :      * The number of workers that need to be launched must be less than the
     572              :      * number of workers with which the parallel context is initialized.  But
     573              :      * the caller might not know that InitializeParallelDSM reduced nworkers,
     574              :      * so just silently trim the request.
     575              :      */
     576           31 :     pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
     577           31 : }
     578              : 
     579              : /*
     580              :  * Launch parallel workers.
     581              :  */
     582              : void
     583          641 : LaunchParallelWorkers(ParallelContext *pcxt)
     584              : {
     585              :     MemoryContext oldcontext;
     586              :     BackgroundWorker worker;
     587              :     int         i;
     588          641 :     bool        any_registrations_failed = false;
     589              : 
     590              :     /* Skip this if we have no workers. */
     591          641 :     if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
     592            0 :         return;
     593              : 
     594              :     /* We need to be a lock group leader. */
     595          641 :     BecomeLockGroupLeader();
     596              : 
     597              :     /* If we do have workers, we'd better have a DSM segment. */
     598              :     Assert(pcxt->seg != NULL);
     599              : 
     600              :     /* We might be running in a short-lived memory context. */
     601          641 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     602              : 
     603              :     /* Configure a worker. */
     604          641 :     memset(&worker, 0, sizeof(worker));
     605          641 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
     606              :              MyProcPid);
     607          641 :     snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
     608          641 :     worker.bgw_flags =
     609              :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
     610              :         | BGWORKER_CLASS_PARALLEL;
     611          641 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     612          641 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     613          641 :     sprintf(worker.bgw_library_name, "postgres");
     614          641 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
     615          641 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
     616          641 :     worker.bgw_notify_pid = MyProcPid;
     617              : 
     618              :     /*
     619              :      * Start workers.
     620              :      *
     621              :      * The caller must be able to tolerate ending up with fewer workers than
     622              :      * expected, so there is no need to throw an error here if registration
     623              :      * fails.  It wouldn't help much anyway, because registering the worker in
     624              :      * no way guarantees that it will start up and initialize successfully.
     625              :      */
     626         2179 :     for (i = 0; i < pcxt->nworkers_to_launch; ++i)
     627              :     {
     628         1538 :         memcpy(worker.bgw_extra, &i, sizeof(int));
     629         3047 :         if (!any_registrations_failed &&
     630         1509 :             RegisterDynamicBackgroundWorker(&worker,
     631         1509 :                                             &pcxt->worker[i].bgwhandle))
     632              :         {
     633         1493 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
     634         1493 :                               pcxt->worker[i].bgwhandle);
     635         1493 :             pcxt->nworkers_launched++;
     636              :         }
     637              :         else
     638              :         {
     639              :             /*
     640              :              * If we weren't able to register the worker, then we've bumped up
     641              :              * against the max_worker_processes limit, and future
     642              :              * registrations will probably fail too, so arrange to skip them.
     643              :              * But we still have to execute this code for the remaining slots
     644              :              * to make sure that we forget about the error queues we budgeted
     645              :              * for those workers.  Otherwise, we'll wait for them to start,
     646              :              * but they never will.
     647              :              */
     648           45 :             any_registrations_failed = true;
     649           45 :             pcxt->worker[i].bgwhandle = NULL;
     650           45 :             shm_mq_detach(pcxt->worker[i].error_mqh);
     651           45 :             pcxt->worker[i].error_mqh = NULL;
     652              :         }
     653              :     }
     654              : 
     655              :     /*
     656              :      * Now that nworkers_launched has taken its final value, we can initialize
     657              :      * known_attached_workers.
     658              :      */
     659          641 :     if (pcxt->nworkers_launched > 0)
     660              :     {
     661          631 :         pcxt->known_attached_workers = palloc0_array(bool, pcxt->nworkers_launched);
     662          631 :         pcxt->nknown_attached_workers = 0;
     663              :     }
     664              : 
     665              :     /* Restore previous memory context. */
     666          641 :     MemoryContextSwitchTo(oldcontext);
     667              : }
     668              : 
     669              : /*
     670              :  * Wait for all workers to attach to their error queues, and throw an error if
     671              :  * any worker fails to do this.
     672              :  *
     673              :  * Callers can assume that if this function returns successfully, then the
     674              :  * number of workers given by pcxt->nworkers_launched have initialized and
     675              :  * attached to their error queues.  Whether or not these workers are guaranteed
     676              :  * to still be running depends on what code the caller asked them to run;
     677              :  * this function does not guarantee that they have not exited.  However, it
     678              :  * does guarantee that any workers which exited must have done so cleanly and
     679              :  * after successfully performing the work with which they were tasked.
     680              :  *
     681              :  * If this function is not called, then some of the workers that were launched
     682              :  * may not have been started due to a fork() failure, or may have exited during
     683              :  * early startup prior to attaching to the error queue, so nworkers_launched
     684              :  * cannot be viewed as completely reliable.  It will never be less than the
     685              :  * number of workers which actually started, but it might be more.  Any workers
     686              :  * that failed to start will still be discovered by
     687              :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
     688              :  * provided that function is eventually reached.
     689              :  *
     690              :  * In general, the leader process should do as much work as possible before
     691              :  * calling this function.  fork() failures and other early-startup failures
     692              :  * are very uncommon, and having the leader sit idle when it could be doing
     693              :  * useful work is undesirable.  However, if the leader needs to wait for
     694              :  * all of its workers or for a specific worker, it may want to call this
     695              :  * function before doing so.  If not, it must make some other provision for
     696              :  * the failure-to-start case, lest it wait forever.  On the other hand, a
     697              :  * leader which never waits for a worker that might not be started yet, or
     698              :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
     699              :  * call this function at all.
     700              :  */
     701              : void
     702           97 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
     703              : {
     704              :     int         i;
     705              : 
     706              :     /* Skip this if we have no launched workers. */
     707           97 :     if (pcxt->nworkers_launched == 0)
     708            0 :         return;
     709              : 
     710              :     for (;;)
     711              :     {
     712              :         /*
     713              :          * This will process any parallel messages that are pending and it may
     714              :          * also throw an error propagated from a worker.
     715              :          */
     716      7372161 :         CHECK_FOR_INTERRUPTS();
     717              : 
     718     15327293 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     719              :         {
     720              :             BgwHandleStatus status;
     721              :             shm_mq     *mq;
     722              :             int         rc;
     723              :             pid_t       pid;
     724              : 
     725      7955132 :             if (pcxt->known_attached_workers[i])
     726       282961 :                 continue;
     727              : 
     728              :             /*
     729              :              * If error_mqh is NULL, then the worker has already exited
     730              :              * cleanly.
     731              :              */
     732      7672171 :             if (pcxt->worker[i].error_mqh == NULL)
     733              :             {
     734            0 :                 pcxt->known_attached_workers[i] = true;
     735            0 :                 ++pcxt->nknown_attached_workers;
     736            0 :                 continue;
     737              :             }
     738              : 
     739      7672171 :             status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
     740      7672171 :             if (status == BGWH_STARTED)
     741              :             {
     742              :                 /* Has the worker attached to the error queue? */
     743      7672102 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     744      7672102 :                 if (shm_mq_get_sender(mq) != NULL)
     745              :                 {
     746              :                     /* Yes, so it is known to be attached. */
     747           97 :                     pcxt->known_attached_workers[i] = true;
     748           97 :                     ++pcxt->nknown_attached_workers;
     749              :                 }
     750              :             }
     751           69 :             else if (status == BGWH_STOPPED)
     752              :             {
     753              :                 /*
     754              :                  * If the worker stopped without attaching to the error queue,
     755              :                  * throw an error.
     756              :                  */
     757            0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     758            0 :                 if (shm_mq_get_sender(mq) == NULL)
     759            0 :                     ereport(ERROR,
     760              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     761              :                              errmsg("parallel worker failed to initialize"),
     762              :                              errhint("More details may be available in the server log.")));
     763              : 
     764            0 :                 pcxt->known_attached_workers[i] = true;
     765            0 :                 ++pcxt->nknown_attached_workers;
     766              :             }
     767              :             else
     768              :             {
     769              :                 /*
     770              :                  * Worker not yet started, so we must wait.  The postmaster
     771              :                  * will notify us if the worker's state changes.  Our latch
     772              :                  * might also get set for some other reason, but if so we'll
     773              :                  * just end up waiting for the same worker again.
     774              :                  */
     775           69 :                 rc = WaitLatch(MyLatch,
     776              :                                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     777              :                                -1, WAIT_EVENT_BGWORKER_STARTUP);
     778              : 
     779           69 :                 if (rc & WL_LATCH_SET)
     780           69 :                     ResetLatch(MyLatch);
     781              :             }
     782              :         }
     783              : 
     784              :         /* If all workers are known to have started, we're done. */
     785      7372161 :         if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
     786              :         {
     787              :             Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
     788           97 :             break;
     789              :         }
     790              :     }
     791              : }
     792              : 
     793              : /*
     794              :  * Wait for all workers to finish computing.
     795              :  *
     796              :  * Even if the parallel operation seems to have completed successfully, it's
     797              :  * important to call this function afterwards.  We must not miss any errors
     798              :  * the workers may have thrown during the parallel operation, or any that they
     799              :  * may yet throw while shutting down.
     800              :  *
     801              :  * Also, we want to update our notion of XactLastRecEnd based on worker
     802              :  * feedback.
     803              :  */
     804              : void
     805          778 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
     806              : {
     807              :     for (;;)
     808          514 :     {
     809         1292 :         bool        anyone_alive = false;
     810         1292 :         int         nfinished = 0;
     811              :         int         i;
     812              : 
     813              :         /*
     814              :          * This will process any parallel messages that are pending, which may
     815              :          * change the outcome of the loop that follows.  It may also throw an
     816              :          * error propagated from a worker.
     817              :          */
     818         1292 :         CHECK_FOR_INTERRUPTS();
     819              : 
     820         4451 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     821              :         {
     822              :             /*
     823              :              * If error_mqh is NULL, then the worker has already exited
     824              :              * cleanly.  If we have received a message through error_mqh from
     825              :              * the worker, we know it started up cleanly, and therefore we're
     826              :              * certain to be notified when it exits.
     827              :              */
     828         3176 :             if (pcxt->worker[i].error_mqh == NULL)
     829         2629 :                 ++nfinished;
     830          547 :             else if (pcxt->known_attached_workers[i])
     831              :             {
     832           17 :                 anyone_alive = true;
     833           17 :                 break;
     834              :             }
     835              :         }
     836              : 
     837         1292 :         if (!anyone_alive)
     838              :         {
     839              :             /* If all workers are known to have finished, we're done. */
     840         1275 :             if (nfinished >= pcxt->nworkers_launched)
     841              :             {
     842              :                 Assert(nfinished == pcxt->nworkers_launched);
     843          778 :                 break;
     844              :             }
     845              : 
     846              :             /*
     847              :              * We didn't detect any living workers, but not all workers are
     848              :              * known to have exited cleanly.  Either not all workers have
     849              :              * launched yet, or maybe some of them failed to start or
     850              :              * terminated abnormally.
     851              :              */
     852         1745 :             for (i = 0; i < pcxt->nworkers_launched; ++i)
     853              :             {
     854              :                 pid_t       pid;
     855              :                 shm_mq     *mq;
     856              : 
     857              :                 /*
     858              :                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
     859              :                  * should just keep waiting.  If it is BGWH_STOPPED, then
     860              :                  * further investigation is needed.
     861              :                  */
     862         1248 :                 if (pcxt->worker[i].error_mqh == NULL ||
     863         1060 :                     pcxt->worker[i].bgwhandle == NULL ||
     864          530 :                     GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
     865              :                                            &pid) != BGWH_STOPPED)
     866         1248 :                     continue;
     867              : 
     868              :                 /*
     869              :                  * Check whether the worker ended up stopped without ever
     870              :                  * attaching to the error queue.  If so, the postmaster was
     871              :                  * unable to fork the worker or it exited without initializing
     872              :                  * properly.  We must throw an error, since the caller may
     873              :                  * have been expecting the worker to do some work before
     874              :                  * exiting.
     875              :                  */
     876            0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     877            0 :                 if (shm_mq_get_sender(mq) == NULL)
     878            0 :                     ereport(ERROR,
     879              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     880              :                              errmsg("parallel worker failed to initialize"),
     881              :                              errhint("More details may be available in the server log.")));
     882              : 
     883              :                 /*
     884              :                  * The worker is stopped, but is attached to the error queue.
     885              :                  * Unless there's a bug somewhere, this will only happen when
     886              :                  * the worker writes messages and terminates after the
     887              :                  * CHECK_FOR_INTERRUPTS() near the top of this function and
     888              :                  * before the call to GetBackgroundWorkerPid().  In that case,
     889              :                  * our latch should have been set as well and the right things
     890              :                  * will happen on the next pass through the loop.
     891              :                  */
     892              :             }
     893              :         }
     894              : 
     895          514 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     896              :                          WAIT_EVENT_PARALLEL_FINISH);
     897          514 :         ResetLatch(MyLatch);
     898              :     }
     899              : 
     900          778 :     if (pcxt->toc != NULL)
     901              :     {
     902              :         FixedParallelState *fps;
     903              : 
     904          778 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     905          778 :         if (fps->last_xlog_end > XactLastRecEnd)
     906           19 :             XactLastRecEnd = fps->last_xlog_end;
     907              :     }
     908          778 : }
     909              : 
     910              : /*
     911              :  * Wait for all workers to exit.
     912              :  *
     913              :  * This function ensures that workers have been completely shutdown.  The
     914              :  * difference between WaitForParallelWorkersToFinish and this function is
     915              :  * that the former just ensures that last message sent by a worker backend is
     916              :  * received by the leader backend whereas this ensures the complete shutdown.
     917              :  */
     918              : static void
     919          641 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
     920              : {
     921              :     int         i;
     922              : 
     923              :     /* Wait until the workers actually die. */
     924         2134 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
     925              :     {
     926              :         BgwHandleStatus status;
     927              : 
     928         1493 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
     929            0 :             continue;
     930              : 
     931         1493 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
     932              : 
     933              :         /*
     934              :          * If the postmaster kicked the bucket, we have no chance of cleaning
     935              :          * up safely -- we won't be able to tell when our workers are actually
     936              :          * dead.  This doesn't necessitate a PANIC since they will all abort
     937              :          * eventually, but we can't safely continue this session.
     938              :          */
     939         1493 :         if (status == BGWH_POSTMASTER_DIED)
     940            0 :             ereport(FATAL,
     941              :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     942              :                      errmsg("postmaster exited during a parallel transaction")));
     943              : 
     944              :         /* Release memory. */
     945         1493 :         pfree(pcxt->worker[i].bgwhandle);
     946         1493 :         pcxt->worker[i].bgwhandle = NULL;
     947              :     }
     948          641 : }
     949              : 
     950              : /*
     951              :  * Destroy a parallel context.
     952              :  *
     953              :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
     954              :  * first, before calling this function.  When this function is invoked, any
     955              :  * remaining workers are forcibly killed; the dynamic shared memory segment
     956              :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
     957              :  */
     958              : void
     959          498 : DestroyParallelContext(ParallelContext *pcxt)
     960              : {
     961              :     int         i;
     962              : 
     963              :     /*
     964              :      * Be careful about order of operations here!  We remove the parallel
     965              :      * context from the list before we do anything else; otherwise, if an
     966              :      * error occurs during a subsequent step, we might try to nuke it again
     967              :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
     968              :      */
     969          498 :     dlist_delete(&pcxt->node);
     970              : 
     971              :     /* Kill each worker in turn, and forget their error queues. */
     972          498 :     if (pcxt->worker != NULL)
     973              :     {
     974         1567 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     975              :         {
     976         1069 :             if (pcxt->worker[i].error_mqh != NULL)
     977              :             {
     978            6 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
     979              : 
     980            6 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
     981            6 :                 pcxt->worker[i].error_mqh = NULL;
     982              :             }
     983              :         }
     984              :     }
     985              : 
     986              :     /*
     987              :      * If we have allocated a shared memory segment, detach it.  This will
     988              :      * implicitly detach the error queues, and any other shared memory queues,
     989              :      * stored there.
     990              :      */
     991          498 :     if (pcxt->seg != NULL)
     992              :     {
     993          498 :         dsm_detach(pcxt->seg);
     994          498 :         pcxt->seg = NULL;
     995              :     }
     996              : 
     997              :     /*
     998              :      * If this parallel context is actually in backend-private memory rather
     999              :      * than shared memory, free that memory instead.
    1000              :      */
    1001          498 :     if (pcxt->private_memory != NULL)
    1002              :     {
    1003            0 :         pfree(pcxt->private_memory);
    1004            0 :         pcxt->private_memory = NULL;
    1005              :     }
    1006              : 
    1007              :     /*
    1008              :      * We can't finish transaction commit or abort until all of the workers
    1009              :      * have exited.  This means, in particular, that we can't respond to
    1010              :      * interrupts at this stage.
    1011              :      */
    1012          498 :     HOLD_INTERRUPTS();
    1013          498 :     WaitForParallelWorkersToExit(pcxt);
    1014          498 :     RESUME_INTERRUPTS();
    1015              : 
    1016              :     /* Free the worker array itself. */
    1017          498 :     if (pcxt->worker != NULL)
    1018              :     {
    1019          498 :         pfree(pcxt->worker);
    1020          498 :         pcxt->worker = NULL;
    1021              :     }
    1022              : 
    1023              :     /* Free memory. */
    1024          498 :     pfree(pcxt->library_name);
    1025          498 :     pfree(pcxt->function_name);
    1026          498 :     pfree(pcxt);
    1027          498 : }
    1028              : 
    1029              : /*
    1030              :  * Are there any parallel contexts currently active?
    1031              :  */
    1032              : bool
    1033            0 : ParallelContextActive(void)
    1034              : {
    1035            0 :     return !dlist_is_empty(&pcxt_list);
    1036              : }
    1037              : 
    1038              : /*
    1039              :  * Handle receipt of an interrupt indicating a parallel worker message.
    1040              :  *
    1041              :  * Note: this is called within a signal handler!  All we can do is set
    1042              :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
    1043              :  * ProcessParallelMessages().
    1044              :  */
    1045              : void
    1046         1629 : HandleParallelMessageInterrupt(void)
    1047              : {
    1048         1629 :     InterruptPending = true;
    1049         1629 :     ParallelMessagePending = true;
    1050         1629 :     SetLatch(MyLatch);
    1051         1629 : }
    1052              : 
    1053              : /*
    1054              :  * Process any queued protocol messages received from parallel workers.
    1055              :  */
    1056              : void
    1057         1599 : ProcessParallelMessages(void)
    1058              : {
    1059              :     dlist_iter  iter;
    1060              :     MemoryContext oldcontext;
    1061              : 
    1062              :     static MemoryContext hpm_context = NULL;
    1063              : 
    1064              :     /*
    1065              :      * This is invoked from ProcessInterrupts(), and since some of the
    1066              :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1067              :      * for recursive calls if more signals are received while this runs.  It's
    1068              :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1069              :      * even if it is safe, so let's block interrupts until done.
    1070              :      */
    1071         1599 :     HOLD_INTERRUPTS();
    1072              : 
    1073              :     /*
    1074              :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
    1075              :      * don't want to risk leaking data into long-lived contexts, so let's do
    1076              :      * our work here in a private context that we can reset on each use.
    1077              :      */
    1078         1599 :     if (hpm_context == NULL)    /* first time through? */
    1079           82 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
    1080              :                                             "ProcessParallelMessages",
    1081              :                                             ALLOCSET_DEFAULT_SIZES);
    1082              :     else
    1083         1517 :         MemoryContextReset(hpm_context);
    1084              : 
    1085         1599 :     oldcontext = MemoryContextSwitchTo(hpm_context);
    1086              : 
    1087              :     /* OK to process messages.  Reset the flag saying there are more to do. */
    1088         1599 :     ParallelMessagePending = false;
    1089              : 
    1090         3313 :     dlist_foreach(iter, &pcxt_list)
    1091              :     {
    1092              :         ParallelContext *pcxt;
    1093              :         int         i;
    1094              : 
    1095         1720 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
    1096         1720 :         if (pcxt->worker == NULL)
    1097            0 :             continue;
    1098              : 
    1099         6956 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
    1100              :         {
    1101              :             /*
    1102              :              * Read as many messages as we can from each worker, but stop when
    1103              :              * either (1) the worker's error queue goes away, which can happen
    1104              :              * if we receive a Terminate message from the worker; or (2) no
    1105              :              * more messages can be read from the worker without blocking.
    1106              :              */
    1107         6731 :             while (pcxt->worker[i].error_mqh != NULL)
    1108              :             {
    1109              :                 shm_mq_result res;
    1110              :                 Size        nbytes;
    1111              :                 void       *data;
    1112              : 
    1113         3066 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
    1114              :                                      &data, true);
    1115         3066 :                 if (res == SHM_MQ_WOULD_BLOCK)
    1116         1571 :                     break;
    1117         1495 :                 else if (res == SHM_MQ_SUCCESS)
    1118              :                 {
    1119              :                     StringInfoData msg;
    1120              : 
    1121         1495 :                     initStringInfo(&msg);
    1122         1495 :                     appendBinaryStringInfo(&msg, data, nbytes);
    1123         1495 :                     ProcessParallelMessage(pcxt, i, &msg);
    1124         1489 :                     pfree(msg.data);
    1125              :                 }
    1126              :                 else
    1127            0 :                     ereport(ERROR,
    1128              :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1129              :                              errmsg("lost connection to parallel worker")));
    1130              :             }
    1131              :         }
    1132              :     }
    1133              : 
    1134         1593 :     MemoryContextSwitchTo(oldcontext);
    1135              : 
    1136              :     /* Might as well clear the context on our way out */
    1137         1593 :     MemoryContextReset(hpm_context);
    1138              : 
    1139         1593 :     RESUME_INTERRUPTS();
    1140         1593 : }
    1141              : 
    1142              : /*
    1143              :  * Process a single protocol message received from a single parallel worker.
    1144              :  */
    1145              : static void
    1146         1495 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
    1147              : {
    1148              :     char        msgtype;
    1149              : 
    1150         1495 :     if (pcxt->known_attached_workers != NULL &&
    1151         1495 :         !pcxt->known_attached_workers[i])
    1152              :     {
    1153         1396 :         pcxt->known_attached_workers[i] = true;
    1154         1396 :         pcxt->nknown_attached_workers++;
    1155              :     }
    1156              : 
    1157         1495 :     msgtype = pq_getmsgbyte(msg);
    1158              : 
    1159         1495 :     switch (msgtype)
    1160              :     {
    1161            6 :         case PqMsg_ErrorResponse:
    1162              :         case PqMsg_NoticeResponse:
    1163              :             {
    1164              :                 ErrorData   edata;
    1165              :                 ErrorContextCallback *save_error_context_stack;
    1166              : 
    1167              :                 /* Parse ErrorResponse or NoticeResponse. */
    1168            6 :                 pq_parse_errornotice(msg, &edata);
    1169              : 
    1170              :                 /* Death of a worker isn't enough justification for suicide. */
    1171            6 :                 edata.elevel = Min(edata.elevel, ERROR);
    1172              : 
    1173              :                 /*
    1174              :                  * If desired, add a context line to show that this is a
    1175              :                  * message propagated from a parallel worker.  Otherwise, it
    1176              :                  * can sometimes be confusing to understand what actually
    1177              :                  * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
    1178              :                  * because it causes test-result instability depending on
    1179              :                  * whether a parallel worker is actually used or not.)
    1180              :                  */
    1181            6 :                 if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
    1182              :                 {
    1183            6 :                     if (edata.context)
    1184            3 :                         edata.context = psprintf("%s\n%s", edata.context,
    1185              :                                                  _("parallel worker"));
    1186              :                     else
    1187            3 :                         edata.context = pstrdup(_("parallel worker"));
    1188              :                 }
    1189              : 
    1190              :                 /*
    1191              :                  * Context beyond that should use the error context callbacks
    1192              :                  * that were in effect when the ParallelContext was created,
    1193              :                  * not the current ones.
    1194              :                  */
    1195            6 :                 save_error_context_stack = error_context_stack;
    1196            6 :                 error_context_stack = pcxt->error_context_stack;
    1197              : 
    1198              :                 /* Rethrow error or print notice. */
    1199            6 :                 ThrowErrorData(&edata);
    1200              : 
    1201              :                 /* Not an error, so restore previous context stack. */
    1202            0 :                 error_context_stack = save_error_context_stack;
    1203              : 
    1204            0 :                 break;
    1205              :             }
    1206              : 
    1207            0 :         case PqMsg_NotificationResponse:
    1208              :             {
    1209              :                 /* Propagate NotifyResponse. */
    1210              :                 int32       pid;
    1211              :                 const char *channel;
    1212              :                 const char *payload;
    1213              : 
    1214            0 :                 pid = pq_getmsgint(msg, 4);
    1215            0 :                 channel = pq_getmsgrawstring(msg);
    1216            0 :                 payload = pq_getmsgrawstring(msg);
    1217            0 :                 pq_endmessage(msg);
    1218              : 
    1219            0 :                 NotifyMyFrontEnd(channel, payload, pid);
    1220              : 
    1221            0 :                 break;
    1222              :             }
    1223              : 
    1224            2 :         case PqMsg_Progress:
    1225              :             {
    1226              :                 /*
    1227              :                  * Only incremental progress reporting is currently supported.
    1228              :                  * However, it's possible to add more fields to the message to
    1229              :                  * allow for handling of other backend progress APIs.
    1230              :                  */
    1231            2 :                 int         index = pq_getmsgint(msg, 4);
    1232            2 :                 int64       incr = pq_getmsgint64(msg);
    1233              : 
    1234            2 :                 pq_getmsgend(msg);
    1235              : 
    1236            2 :                 pgstat_progress_incr_param(index, incr);
    1237              : 
    1238            2 :                 break;
    1239              :             }
    1240              : 
    1241         1487 :         case PqMsg_Terminate:
    1242              :             {
    1243         1487 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
    1244         1487 :                 pcxt->worker[i].error_mqh = NULL;
    1245         1487 :                 break;
    1246              :             }
    1247              : 
    1248            0 :         default:
    1249              :             {
    1250            0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
    1251              :                      msgtype, msg->len);
    1252              :             }
    1253              :     }
    1254         1489 : }
    1255              : 
    1256              : /*
    1257              :  * End-of-subtransaction cleanup for parallel contexts.
    1258              :  *
    1259              :  * Here we remove only parallel contexts initiated within the current
    1260              :  * subtransaction.
    1261              :  */
    1262              : void
    1263        10084 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
    1264              : {
    1265        10087 :     while (!dlist_is_empty(&pcxt_list))
    1266              :     {
    1267              :         ParallelContext *pcxt;
    1268              : 
    1269            3 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1270            3 :         if (pcxt->subid != mySubId)
    1271            0 :             break;
    1272            3 :         if (isCommit)
    1273            0 :             elog(WARNING, "leaked parallel context");
    1274            3 :         DestroyParallelContext(pcxt);
    1275              :     }
    1276        10084 : }
    1277              : 
    1278              : /*
    1279              :  * End-of-transaction cleanup for parallel contexts.
    1280              :  *
    1281              :  * We nuke all remaining parallel contexts.
    1282              :  */
    1283              : void
    1284       546622 : AtEOXact_Parallel(bool isCommit)
    1285              : {
    1286       546625 :     while (!dlist_is_empty(&pcxt_list))
    1287              :     {
    1288              :         ParallelContext *pcxt;
    1289              : 
    1290            3 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1291            3 :         if (isCommit)
    1292            0 :             elog(WARNING, "leaked parallel context");
    1293            3 :         DestroyParallelContext(pcxt);
    1294              :     }
    1295       546622 : }
    1296              : 
    1297              : /*
    1298              :  * Main entrypoint for parallel workers.
    1299              :  */
    1300              : void
    1301         1493 : ParallelWorkerMain(Datum main_arg)
    1302              : {
    1303              :     dsm_segment *seg;
    1304              :     shm_toc    *toc;
    1305              :     FixedParallelState *fps;
    1306              :     char       *error_queue_space;
    1307              :     shm_mq     *mq;
    1308              :     shm_mq_handle *mqh;
    1309              :     char       *libraryspace;
    1310              :     char       *entrypointstate;
    1311              :     char       *library_name;
    1312              :     char       *function_name;
    1313              :     parallel_worker_main_type entrypt;
    1314              :     char       *gucspace;
    1315              :     char       *combocidspace;
    1316              :     char       *tsnapspace;
    1317              :     char       *asnapspace;
    1318              :     char       *tstatespace;
    1319              :     char       *pendingsyncsspace;
    1320              :     char       *reindexspace;
    1321              :     char       *relmapperspace;
    1322              :     char       *uncommittedenumsspace;
    1323              :     char       *clientconninfospace;
    1324              :     char       *session_dsm_handle_space;
    1325              :     Snapshot    tsnapshot;
    1326              :     Snapshot    asnapshot;
    1327              : 
    1328              :     /* Set flag to indicate that we're initializing a parallel worker. */
    1329         1493 :     InitializingParallelWorker = true;
    1330              : 
    1331              :     /* Establish signal handlers. */
    1332         1493 :     BackgroundWorkerUnblockSignals();
    1333              : 
    1334              :     /* Determine and set our parallel worker number. */
    1335              :     Assert(ParallelWorkerNumber == -1);
    1336         1493 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
    1337              : 
    1338              :     /* Set up a memory context to work in, just for cleanliness. */
    1339         1493 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
    1340              :                                                  "Parallel worker",
    1341              :                                                  ALLOCSET_DEFAULT_SIZES);
    1342              : 
    1343              :     /*
    1344              :      * Attach to the dynamic shared memory segment for the parallel query, and
    1345              :      * find its table of contents.
    1346              :      *
    1347              :      * Note: at this point, we have not created any ResourceOwner in this
    1348              :      * process.  This will result in our DSM mapping surviving until process
    1349              :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
    1350              :      * ownership of the mapping, but we have no need for that.
    1351              :      */
    1352         1493 :     seg = dsm_attach(DatumGetUInt32(main_arg));
    1353         1493 :     if (seg == NULL)
    1354            0 :         ereport(ERROR,
    1355              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1356              :                  errmsg("could not map dynamic shared memory segment")));
    1357         1493 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
    1358         1493 :     if (toc == NULL)
    1359            0 :         ereport(ERROR,
    1360              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1361              :                  errmsg("invalid magic number in dynamic shared memory segment")));
    1362              : 
    1363              :     /* Look up fixed parallel state. */
    1364         1493 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
    1365         1493 :     MyFixedParallelState = fps;
    1366              : 
    1367              :     /* Arrange to signal the leader if we exit. */
    1368         1493 :     ParallelLeaderPid = fps->parallel_leader_pid;
    1369         1493 :     ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
    1370         1493 :     before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
    1371              : 
    1372              :     /*
    1373              :      * Now we can find and attach to the error queue provided for us.  That's
    1374              :      * good, because until we do that, any errors that happen here will not be
    1375              :      * reported back to the process that requested that this worker be
    1376              :      * launched.
    1377              :      */
    1378         1493 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
    1379         1493 :     mq = (shm_mq *) (error_queue_space +
    1380         1493 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
    1381         1493 :     shm_mq_set_sender(mq, MyProc);
    1382         1493 :     mqh = shm_mq_attach(mq, seg, NULL);
    1383         1493 :     pq_redirect_to_shm_mq(seg, mqh);
    1384         1493 :     pq_set_parallel_leader(fps->parallel_leader_pid,
    1385              :                            fps->parallel_leader_proc_number);
    1386              : 
    1387              :     /*
    1388              :      * Hooray! Primary initialization is complete.  Now, we need to set up our
    1389              :      * backend-local state to match the original backend.
    1390              :      */
    1391              : 
    1392              :     /*
    1393              :      * Join locking group.  We must do this before anything that could try to
    1394              :      * acquire a heavyweight lock, because any heavyweight locks acquired to
    1395              :      * this point could block either directly against the parallel group
    1396              :      * leader or against some process which in turn waits for a lock that
    1397              :      * conflicts with the parallel group leader, causing an undetected
    1398              :      * deadlock.  (If we can't join the lock group, the leader has gone away,
    1399              :      * so just exit quietly.)
    1400              :      */
    1401         1493 :     if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
    1402              :                                fps->parallel_leader_pid))
    1403            0 :         return;
    1404              : 
    1405              :     /*
    1406              :      * Restore transaction and statement start-time timestamps.  This must
    1407              :      * happen before anything that would start a transaction, else asserts in
    1408              :      * xact.c will fire.
    1409              :      */
    1410         1493 :     SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
    1411              : 
    1412              :     /*
    1413              :      * Identify the entry point to be called.  In theory this could result in
    1414              :      * loading an additional library, though most likely the entry point is in
    1415              :      * the core backend or in a library we just loaded.
    1416              :      */
    1417         1493 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
    1418         1493 :     library_name = entrypointstate;
    1419         1493 :     function_name = entrypointstate + strlen(library_name) + 1;
    1420              : 
    1421         1493 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
    1422              : 
    1423              :     /*
    1424              :      * Restore current session authorization and role id.  No verification
    1425              :      * happens here, we just blindly adopt the leader's state.  Note that this
    1426              :      * has to happen before InitPostgres, since InitializeSessionUserId will
    1427              :      * not set these variables.
    1428              :      */
    1429         1493 :     SetAuthenticatedUserId(fps->authenticated_user_id);
    1430         1493 :     SetSessionAuthorization(fps->session_user_id,
    1431         1493 :                             fps->session_user_is_superuser);
    1432         1493 :     SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
    1433              : 
    1434              :     /*
    1435              :      * Restore database connection.  We skip connection authorization checks,
    1436              :      * reasoning that (a) the leader checked these things when it started, and
    1437              :      * (b) we do not want parallel mode to cause these failures, because that
    1438              :      * would make use of parallel query plans not transparent to applications.
    1439              :      */
    1440         1493 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
    1441              :                                               fps->authenticated_user_id,
    1442              :                                               BGWORKER_BYPASS_ALLOWCONN |
    1443              :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
    1444              : 
    1445              :     /*
    1446              :      * Set the client encoding to the database encoding, since that is what
    1447              :      * the leader will expect.  (We're cheating a bit by not calling
    1448              :      * PrepareClientEncoding first.  It's okay because this call will always
    1449              :      * result in installing a no-op conversion.  No error should be possible,
    1450              :      * but check anyway.)
    1451              :      */
    1452         1493 :     if (SetClientEncoding(GetDatabaseEncoding()) < 0)
    1453            0 :         elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
    1454              : 
    1455              :     /*
    1456              :      * Load libraries that were loaded by original backend.  We want to do
    1457              :      * this before restoring GUCs, because the libraries might define custom
    1458              :      * variables.
    1459              :      */
    1460         1493 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
    1461         1493 :     StartTransactionCommand();
    1462         1493 :     RestoreLibraryState(libraryspace);
    1463         1493 :     CommitTransactionCommand();
    1464              : 
    1465              :     /* Crank up a transaction state appropriate to a parallel worker. */
    1466         1493 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
    1467         1493 :     StartParallelWorkerTransaction(tstatespace);
    1468              : 
    1469              :     /*
    1470              :      * Restore state that affects catalog access.  Ideally we'd do this even
    1471              :      * before calling InitPostgres, but that has order-of-initialization
    1472              :      * problems, and also the relmapper would get confused during the
    1473              :      * CommitTransactionCommand call above.
    1474              :      */
    1475         1493 :     pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
    1476              :                                        false);
    1477         1493 :     RestorePendingSyncs(pendingsyncsspace);
    1478         1493 :     relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
    1479         1493 :     RestoreRelationMap(relmapperspace);
    1480         1493 :     reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
    1481         1493 :     RestoreReindexState(reindexspace);
    1482         1493 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
    1483         1493 :     RestoreComboCIDState(combocidspace);
    1484              : 
    1485              :     /* Attach to the per-session DSM segment and contained objects. */
    1486              :     session_dsm_handle_space =
    1487         1493 :         shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
    1488         1493 :     AttachSession(*(dsm_handle *) session_dsm_handle_space);
    1489              : 
    1490              :     /*
    1491              :      * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
    1492              :      * the leader has serialized the transaction snapshot and we must restore
    1493              :      * it. At lower isolation levels, there is no transaction-lifetime
    1494              :      * snapshot, but we need TransactionXmin to get set to a value which is
    1495              :      * less than or equal to the xmin of every snapshot that will be used by
    1496              :      * this worker. The easiest way to accomplish that is to install the
    1497              :      * active snapshot as the transaction snapshot. Code running in this
    1498              :      * parallel worker might take new snapshots via GetTransactionSnapshot()
    1499              :      * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
    1500              :      * snapshot older than the active snapshot.
    1501              :      */
    1502         1493 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
    1503         1493 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
    1504         1493 :     asnapshot = RestoreSnapshot(asnapspace);
    1505         1493 :     tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
    1506         1493 :     RestoreTransactionSnapshot(tsnapshot,
    1507         1493 :                                fps->parallel_leader_pgproc);
    1508         1493 :     PushActiveSnapshot(asnapshot);
    1509              : 
    1510              :     /*
    1511              :      * We've changed which tuples we can see, and must therefore invalidate
    1512              :      * system caches.
    1513              :      */
    1514         1493 :     InvalidateSystemCaches();
    1515              : 
    1516              :     /*
    1517              :      * Restore GUC values from launching backend.  We can't do this earlier,
    1518              :      * because GUC check hooks that do catalog lookups need to see the same
    1519              :      * database state as the leader.  Also, the check hooks for
    1520              :      * session_authorization and role assume we already set the correct role
    1521              :      * OIDs.
    1522              :      */
    1523         1493 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
    1524         1493 :     RestoreGUCState(gucspace);
    1525              : 
    1526              :     /*
    1527              :      * Restore current user ID and security context.  No verification happens
    1528              :      * here, we just blindly adopt the leader's state.  We can't do this till
    1529              :      * after restoring GUCs, else we'll get complaints about restoring
    1530              :      * session_authorization and role.  (In effect, we're assuming that all
    1531              :      * the restored values are okay to set, even if we are now inside a
    1532              :      * restricted context.)
    1533              :      */
    1534         1493 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
    1535              : 
    1536              :     /* Restore temp-namespace state to ensure search path matches leader's. */
    1537         1493 :     SetTempNamespaceState(fps->temp_namespace_id,
    1538              :                           fps->temp_toast_namespace_id);
    1539              : 
    1540              :     /* Restore uncommitted enums. */
    1541         1493 :     uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
    1542              :                                            false);
    1543         1493 :     RestoreUncommittedEnums(uncommittedenumsspace);
    1544              : 
    1545              :     /* Restore the ClientConnectionInfo. */
    1546         1493 :     clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
    1547              :                                          false);
    1548         1493 :     RestoreClientConnectionInfo(clientconninfospace);
    1549              : 
    1550              :     /*
    1551              :      * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
    1552              :      * ensure that auth_method is actually valid, aka authn_id is not NULL.
    1553              :      */
    1554         1493 :     if (MyClientConnectionInfo.authn_id)
    1555            2 :         InitializeSystemUser(MyClientConnectionInfo.authn_id,
    1556              :                              hba_authname(MyClientConnectionInfo.auth_method));
    1557              : 
    1558              :     /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
    1559         1493 :     AttachSerializableXact(fps->serializable_xact_handle);
    1560              : 
    1561              :     /*
    1562              :      * We've initialized all of our state now; nothing should change
    1563              :      * hereafter.
    1564              :      */
    1565         1493 :     InitializingParallelWorker = false;
    1566         1493 :     EnterParallelMode();
    1567              : 
    1568              :     /*
    1569              :      * Time to do the real work: invoke the caller-supplied code.
    1570              :      */
    1571         1493 :     entrypt(seg, toc);
    1572              : 
    1573              :     /* Must exit parallel mode to pop active snapshot. */
    1574         1487 :     ExitParallelMode();
    1575              : 
    1576              :     /* Must pop active snapshot so snapmgr.c doesn't complain. */
    1577         1487 :     PopActiveSnapshot();
    1578              : 
    1579              :     /* Shut down the parallel-worker transaction. */
    1580         1487 :     EndParallelWorkerTransaction();
    1581              : 
    1582              :     /* Detach from the per-session DSM segment. */
    1583         1487 :     DetachSession();
    1584              : 
    1585              :     /* Report success. */
    1586         1487 :     pq_putmessage(PqMsg_Terminate, NULL, 0);
    1587              : }
    1588              : 
    1589              : /*
    1590              :  * Update shared memory with the ending location of the last WAL record we
    1591              :  * wrote, if it's greater than the value already stored there.
    1592              :  */
    1593              : void
    1594         1487 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
    1595              : {
    1596         1487 :     FixedParallelState *fps = MyFixedParallelState;
    1597              : 
    1598              :     Assert(fps != NULL);
    1599         1487 :     SpinLockAcquire(&fps->mutex);
    1600         1487 :     if (fps->last_xlog_end < last_xlog_end)
    1601          113 :         fps->last_xlog_end = last_xlog_end;
    1602         1487 :     SpinLockRelease(&fps->mutex);
    1603         1487 : }
    1604              : 
    1605              : /*
    1606              :  * Make sure the leader tries to read from our error queue one more time.
    1607              :  * This guards against the case where we exit uncleanly without sending an
    1608              :  * ErrorResponse to the leader, for example because some code calls proc_exit
    1609              :  * directly.
    1610              :  *
    1611              :  * Also explicitly detach from dsm segment so that subsystems using
    1612              :  * on_dsm_detach() have a chance to send stats before the stats subsystem is
    1613              :  * shut down as part of a before_shmem_exit() hook.
    1614              :  *
    1615              :  * One might think this could instead be solved by carefully ordering the
    1616              :  * attaching to dsm segments, so that the pgstats segments get detached from
    1617              :  * later than the parallel query one. That turns out to not work because the
    1618              :  * stats hash might need to grow which can cause new segments to be allocated,
    1619              :  * which then will be detached from earlier.
    1620              :  */
    1621              : static void
    1622         1493 : ParallelWorkerShutdown(int code, Datum arg)
    1623              : {
    1624         1493 :     SendProcSignal(ParallelLeaderPid,
    1625              :                    PROCSIG_PARALLEL_MESSAGE,
    1626              :                    ParallelLeaderProcNumber);
    1627              : 
    1628         1493 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
    1629         1493 : }
    1630              : 
    1631              : /*
    1632              :  * Look up (and possibly load) a parallel worker entry point function.
    1633              :  *
    1634              :  * For functions contained in the core code, we use library name "postgres"
    1635              :  * and consult the InternalParallelWorkers array.  External functions are
    1636              :  * looked up, and loaded if necessary, using load_external_function().
    1637              :  *
    1638              :  * The point of this is to pass function names as strings across process
    1639              :  * boundaries.  We can't pass actual function addresses because of the
    1640              :  * possibility that the function has been loaded at a different address
    1641              :  * in a different process.  This is obviously a hazard for functions in
    1642              :  * loadable libraries, but it can happen even for functions in the core code
    1643              :  * on platforms using EXEC_BACKEND (e.g., Windows).
    1644              :  *
    1645              :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
    1646              :  * in favor of applying load_external_function() for core functions too;
    1647              :  * but that raises portability issues that are not worth addressing now.
    1648              :  */
    1649              : static parallel_worker_main_type
    1650         1493 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
    1651              : {
    1652              :     /*
    1653              :      * If the function is to be loaded from postgres itself, search the
    1654              :      * InternalParallelWorkers array.
    1655              :      */
    1656         1493 :     if (strcmp(libraryname, "postgres") == 0)
    1657              :     {
    1658              :         int         i;
    1659              : 
    1660         1814 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
    1661              :         {
    1662         1814 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
    1663         1493 :                 return InternalParallelWorkers[i].fn_addr;
    1664              :         }
    1665              : 
    1666              :         /* We can only reach this by programming error. */
    1667            0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
    1668              :     }
    1669              : 
    1670              :     /* Otherwise load from external library. */
    1671            0 :     return (parallel_worker_main_type)
    1672            0 :         load_external_function(libraryname, funcname, true, NULL);
    1673              : }
        

Generated by: LCOV version 2.0-1