LCOV - code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 432 485 89.1 %
Date: 2025-12-24 10:18:01 Functions: 18 19 94.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * parallel.c
       4             :  *    Infrastructure for launching parallel workers
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/access/transam/parallel.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/brin.h"
      18             : #include "access/gin.h"
      19             : #include "access/nbtree.h"
      20             : #include "access/parallel.h"
      21             : #include "access/session.h"
      22             : #include "access/xact.h"
      23             : #include "access/xlog.h"
      24             : #include "catalog/index.h"
      25             : #include "catalog/namespace.h"
      26             : #include "catalog/pg_enum.h"
      27             : #include "catalog/storage.h"
      28             : #include "commands/async.h"
      29             : #include "commands/vacuum.h"
      30             : #include "executor/execParallel.h"
      31             : #include "libpq/libpq.h"
      32             : #include "libpq/pqformat.h"
      33             : #include "libpq/pqmq.h"
      34             : #include "miscadmin.h"
      35             : #include "optimizer/optimizer.h"
      36             : #include "pgstat.h"
      37             : #include "storage/ipc.h"
      38             : #include "storage/predicate.h"
      39             : #include "storage/spin.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/combocid.h"
      42             : #include "utils/guc.h"
      43             : #include "utils/inval.h"
      44             : #include "utils/memutils.h"
      45             : #include "utils/relmapper.h"
      46             : #include "utils/snapmgr.h"
      47             : 
      48             : /*
      49             :  * We don't want to waste a lot of memory on an error queue which, most of
      50             :  * the time, will process only a handful of small messages.  However, it is
      51             :  * desirable to make it large enough that a typical ErrorResponse can be sent
      52             :  * without blocking.  That way, a worker that errors out can write the whole
      53             :  * message into the queue and terminate without waiting for the user backend.
      54             :  */
      55             : #define PARALLEL_ERROR_QUEUE_SIZE           16384
      56             : 
      57             : /* Magic number for parallel context TOC. */
      58             : #define PARALLEL_MAGIC                      0x50477c7c
      59             : 
      60             : /*
      61             :  * Magic numbers for per-context parallel state sharing.  Higher-level code
      62             :  * should use smaller values, leaving these very large ones for use by this
      63             :  * module.
      64             :  */
      65             : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
      66             : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
      67             : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
      68             : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
      69             : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
      70             : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
      71             : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
      72             : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
      73             : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
      74             : #define PARALLEL_KEY_SESSION_DSM            UINT64CONST(0xFFFFFFFFFFFF000A)
      75             : #define PARALLEL_KEY_PENDING_SYNCS          UINT64CONST(0xFFFFFFFFFFFF000B)
      76             : #define PARALLEL_KEY_REINDEX_STATE          UINT64CONST(0xFFFFFFFFFFFF000C)
      77             : #define PARALLEL_KEY_RELMAPPER_STATE        UINT64CONST(0xFFFFFFFFFFFF000D)
      78             : #define PARALLEL_KEY_UNCOMMITTEDENUMS       UINT64CONST(0xFFFFFFFFFFFF000E)
      79             : #define PARALLEL_KEY_CLIENTCONNINFO         UINT64CONST(0xFFFFFFFFFFFF000F)
      80             : 
      81             : /* Fixed-size parallel state. */
      82             : typedef struct FixedParallelState
      83             : {
      84             :     /* Fixed-size state that workers must restore. */
      85             :     Oid         database_id;
      86             :     Oid         authenticated_user_id;
      87             :     Oid         session_user_id;
      88             :     Oid         outer_user_id;
      89             :     Oid         current_user_id;
      90             :     Oid         temp_namespace_id;
      91             :     Oid         temp_toast_namespace_id;
      92             :     int         sec_context;
      93             :     bool        session_user_is_superuser;
      94             :     bool        role_is_superuser;
      95             :     PGPROC     *parallel_leader_pgproc;
      96             :     pid_t       parallel_leader_pid;
      97             :     ProcNumber  parallel_leader_proc_number;
      98             :     TimestampTz xact_ts;
      99             :     TimestampTz stmt_ts;
     100             :     SerializableXactHandle serializable_xact_handle;
     101             : 
     102             :     /* Mutex protects remaining fields. */
     103             :     slock_t     mutex;
     104             : 
     105             :     /* Maximum XactLastRecEnd of any worker. */
     106             :     XLogRecPtr  last_xlog_end;
     107             : } FixedParallelState;
     108             : 
     109             : /*
     110             :  * Our parallel worker number.  We initialize this to -1, meaning that we are
     111             :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
     112             :  * and < the number of workers before any user code is invoked; each parallel
     113             :  * worker will get a different parallel worker number.
     114             :  */
     115             : int         ParallelWorkerNumber = -1;
     116             : 
     117             : /* Is there a parallel message pending which we need to receive? */
     118             : volatile sig_atomic_t ParallelMessagePending = false;
     119             : 
     120             : /* Are we initializing a parallel worker? */
     121             : bool        InitializingParallelWorker = false;
     122             : 
     123             : /* Pointer to our fixed parallel state. */
     124             : static FixedParallelState *MyFixedParallelState;
     125             : 
     126             : /* List of active parallel contexts. */
     127             : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
     128             : 
     129             : /* Backend-local copy of data from FixedParallelState. */
     130             : static pid_t ParallelLeaderPid;
     131             : 
     132             : /*
     133             :  * List of internal parallel worker entry points.  We need this for
     134             :  * reasons explained in LookupParallelWorkerFunction(), below.
     135             :  */
     136             : static const struct
     137             : {
     138             :     const char *fn_name;
     139             :     parallel_worker_main_type fn_addr;
     140             : }           InternalParallelWorkers[] =
     141             : 
     142             : {
     143             :     {
     144             :         "ParallelQueryMain", ParallelQueryMain
     145             :     },
     146             :     {
     147             :         "_bt_parallel_build_main", _bt_parallel_build_main
     148             :     },
     149             :     {
     150             :         "_brin_parallel_build_main", _brin_parallel_build_main
     151             :     },
     152             :     {
     153             :         "_gin_parallel_build_main", _gin_parallel_build_main
     154             :     },
     155             :     {
     156             :         "parallel_vacuum_main", parallel_vacuum_main
     157             :     }
     158             : };
     159             : 
     160             : /* Private functions. */
     161             : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
     162             : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
     163             : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
     164             : static void ParallelWorkerShutdown(int code, Datum arg);
     165             : 
     166             : 
     167             : /*
     168             :  * Establish a new parallel context.  This should be done after entering
     169             :  * parallel mode, and (unless there is an error) the context should be
     170             :  * destroyed before exiting the current subtransaction.
     171             :  */
     172             : ParallelContext *
     173         952 : CreateParallelContext(const char *library_name, const char *function_name,
     174             :                       int nworkers)
     175             : {
     176             :     MemoryContext oldcontext;
     177             :     ParallelContext *pcxt;
     178             : 
     179             :     /* It is unsafe to create a parallel context if not in parallel mode. */
     180             :     Assert(IsInParallelMode());
     181             : 
     182             :     /* Number of workers should be non-negative. */
     183             :     Assert(nworkers >= 0);
     184             : 
     185             :     /* We might be running in a short-lived memory context. */
     186         952 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     187             : 
     188             :     /* Initialize a new ParallelContext. */
     189         952 :     pcxt = palloc0_object(ParallelContext);
     190         952 :     pcxt->subid = GetCurrentSubTransactionId();
     191         952 :     pcxt->nworkers = nworkers;
     192         952 :     pcxt->nworkers_to_launch = nworkers;
     193         952 :     pcxt->library_name = pstrdup(library_name);
     194         952 :     pcxt->function_name = pstrdup(function_name);
     195         952 :     pcxt->error_context_stack = error_context_stack;
     196         952 :     shm_toc_initialize_estimator(&pcxt->estimator);
     197         952 :     dlist_push_head(&pcxt_list, &pcxt->node);
     198             : 
     199             :     /* Restore previous memory context. */
     200         952 :     MemoryContextSwitchTo(oldcontext);
     201             : 
     202         952 :     return pcxt;
     203             : }
     204             : 
     205             : /*
     206             :  * Establish the dynamic shared memory segment for a parallel context and
     207             :  * copy state and other bookkeeping information that will be needed by
     208             :  * parallel workers into it.
     209             :  */
     210             : void
     211         952 : InitializeParallelDSM(ParallelContext *pcxt)
     212             : {
     213             :     MemoryContext oldcontext;
     214         952 :     Size        library_len = 0;
     215         952 :     Size        guc_len = 0;
     216         952 :     Size        combocidlen = 0;
     217         952 :     Size        tsnaplen = 0;
     218         952 :     Size        asnaplen = 0;
     219         952 :     Size        tstatelen = 0;
     220         952 :     Size        pendingsyncslen = 0;
     221         952 :     Size        reindexlen = 0;
     222         952 :     Size        relmapperlen = 0;
     223         952 :     Size        uncommittedenumslen = 0;
     224         952 :     Size        clientconninfolen = 0;
     225         952 :     Size        segsize = 0;
     226             :     int         i;
     227             :     FixedParallelState *fps;
     228         952 :     dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
     229         952 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
     230         952 :     Snapshot    active_snapshot = GetActiveSnapshot();
     231             : 
     232             :     /* We might be running in a very short-lived memory context. */
     233         952 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     234             : 
     235             :     /* Allow space to store the fixed-size parallel state. */
     236         952 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
     237         952 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     238             : 
     239             :     /*
     240             :      * If we manage to reach here while non-interruptible, it's unsafe to
     241             :      * launch any workers: we would fail to process interrupts sent by them.
     242             :      * We can deal with that edge case by pretending no workers were
     243             :      * requested.
     244             :      */
     245         952 :     if (!INTERRUPTS_CAN_BE_PROCESSED())
     246           0 :         pcxt->nworkers = 0;
     247             : 
     248             :     /*
     249             :      * Normally, the user will have requested at least one worker process, but
     250             :      * if by chance they have not, we can skip a bunch of things here.
     251             :      */
     252         952 :     if (pcxt->nworkers > 0)
     253             :     {
     254             :         /* Get (or create) the per-session DSM segment's handle. */
     255         952 :         session_dsm_handle = GetSessionDsmHandle();
     256             : 
     257             :         /*
     258             :          * If we weren't able to create a per-session DSM segment, then we can
     259             :          * continue but we can't safely launch any workers because their
     260             :          * record typmods would be incompatible so they couldn't exchange
     261             :          * tuples.
     262             :          */
     263         952 :         if (session_dsm_handle == DSM_HANDLE_INVALID)
     264           0 :             pcxt->nworkers = 0;
     265             :     }
     266             : 
     267         952 :     if (pcxt->nworkers > 0)
     268             :     {
     269             :         StaticAssertDecl(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
     270             :                          PARALLEL_ERROR_QUEUE_SIZE,
     271             :                          "parallel error queue size not buffer-aligned");
     272             : 
     273             :         /* Estimate space for various kinds of state sharing. */
     274         952 :         library_len = EstimateLibraryStateSpace();
     275         952 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
     276         952 :         guc_len = EstimateGUCStateSpace();
     277         952 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
     278         952 :         combocidlen = EstimateComboCIDStateSpace();
     279         952 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
     280         952 :         if (IsolationUsesXactSnapshot())
     281             :         {
     282          22 :             tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
     283          22 :             shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
     284             :         }
     285         952 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
     286         952 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
     287         952 :         tstatelen = EstimateTransactionStateSpace();
     288         952 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
     289         952 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
     290         952 :         pendingsyncslen = EstimatePendingSyncsSpace();
     291         952 :         shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
     292         952 :         reindexlen = EstimateReindexStateSpace();
     293         952 :         shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
     294         952 :         relmapperlen = EstimateRelationMapSpace();
     295         952 :         shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
     296         952 :         uncommittedenumslen = EstimateUncommittedEnumsSpace();
     297         952 :         shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
     298         952 :         clientconninfolen = EstimateClientConnectionInfoSpace();
     299         952 :         shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
     300             :         /* If you add more chunks here, you probably need to add keys. */
     301         952 :         shm_toc_estimate_keys(&pcxt->estimator, 12);
     302             : 
     303             :         /* Estimate space need for error queues. */
     304         952 :         shm_toc_estimate_chunk(&pcxt->estimator,
     305             :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     306             :                                         pcxt->nworkers));
     307         952 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     308             : 
     309             :         /* Estimate how much we'll need for the entrypoint info. */
     310         952 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
     311             :                                strlen(pcxt->function_name) + 2);
     312         952 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     313             :     }
     314             : 
     315             :     /*
     316             :      * Create DSM and initialize with new table of contents.  But if the user
     317             :      * didn't request any workers, then don't bother creating a dynamic shared
     318             :      * memory segment; instead, just use backend-private memory.
     319             :      *
     320             :      * Also, if we can't create a dynamic shared memory segment because the
     321             :      * maximum number of segments have already been created, then fall back to
     322             :      * backend-private memory, and plan not to use any workers.  We hope this
     323             :      * won't happen very often, but it's better to abandon the use of
     324             :      * parallelism than to fail outright.
     325             :      */
     326         952 :     segsize = shm_toc_estimate(&pcxt->estimator);
     327         952 :     if (pcxt->nworkers > 0)
     328         952 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
     329         952 :     if (pcxt->seg != NULL)
     330         952 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
     331             :                                    dsm_segment_address(pcxt->seg),
     332             :                                    segsize);
     333             :     else
     334             :     {
     335           0 :         pcxt->nworkers = 0;
     336           0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
     337           0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
     338             :                                    segsize);
     339             :     }
     340             : 
     341             :     /* Initialize fixed-size state in shared memory. */
     342             :     fps = (FixedParallelState *)
     343         952 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
     344         952 :     fps->database_id = MyDatabaseId;
     345         952 :     fps->authenticated_user_id = GetAuthenticatedUserId();
     346         952 :     fps->session_user_id = GetSessionUserId();
     347         952 :     fps->outer_user_id = GetCurrentRoleId();
     348         952 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
     349         952 :     fps->session_user_is_superuser = GetSessionUserIsSuperuser();
     350         952 :     fps->role_is_superuser = current_role_is_superuser;
     351         952 :     GetTempNamespaceState(&fps->temp_namespace_id,
     352             :                           &fps->temp_toast_namespace_id);
     353         952 :     fps->parallel_leader_pgproc = MyProc;
     354         952 :     fps->parallel_leader_pid = MyProcPid;
     355         952 :     fps->parallel_leader_proc_number = MyProcNumber;
     356         952 :     fps->xact_ts = GetCurrentTransactionStartTimestamp();
     357         952 :     fps->stmt_ts = GetCurrentStatementStartTimestamp();
     358         952 :     fps->serializable_xact_handle = ShareSerializableXact();
     359         952 :     SpinLockInit(&fps->mutex);
     360         952 :     fps->last_xlog_end = 0;
     361         952 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
     362             : 
     363             :     /* We can skip the rest of this if we're not budgeting for any workers. */
     364         952 :     if (pcxt->nworkers > 0)
     365             :     {
     366             :         char       *libraryspace;
     367             :         char       *gucspace;
     368             :         char       *combocidspace;
     369             :         char       *tsnapspace;
     370             :         char       *asnapspace;
     371             :         char       *tstatespace;
     372             :         char       *pendingsyncsspace;
     373             :         char       *reindexspace;
     374             :         char       *relmapperspace;
     375             :         char       *error_queue_space;
     376             :         char       *session_dsm_handle_space;
     377             :         char       *entrypointstate;
     378             :         char       *uncommittedenumsspace;
     379             :         char       *clientconninfospace;
     380             :         Size        lnamelen;
     381             : 
     382             :         /* Serialize shared libraries we have loaded. */
     383         952 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
     384         952 :         SerializeLibraryState(library_len, libraryspace);
     385         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
     386             : 
     387             :         /* Serialize GUC settings. */
     388         952 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
     389         952 :         SerializeGUCState(guc_len, gucspace);
     390         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
     391             : 
     392             :         /* Serialize combo CID state. */
     393         952 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
     394         952 :         SerializeComboCIDState(combocidlen, combocidspace);
     395         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
     396             : 
     397             :         /*
     398             :          * Serialize the transaction snapshot if the transaction isolation
     399             :          * level uses a transaction snapshot.
     400             :          */
     401         952 :         if (IsolationUsesXactSnapshot())
     402             :         {
     403          22 :             tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
     404          22 :             SerializeSnapshot(transaction_snapshot, tsnapspace);
     405          22 :             shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
     406             :                            tsnapspace);
     407             :         }
     408             : 
     409             :         /* Serialize the active snapshot. */
     410         952 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
     411         952 :         SerializeSnapshot(active_snapshot, asnapspace);
     412         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
     413             : 
     414             :         /* Provide the handle for per-session segment. */
     415         952 :         session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
     416             :                                                     sizeof(dsm_handle));
     417         952 :         *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
     418         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
     419             :                        session_dsm_handle_space);
     420             : 
     421             :         /* Serialize transaction state. */
     422         952 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
     423         952 :         SerializeTransactionState(tstatelen, tstatespace);
     424         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
     425             : 
     426             :         /* Serialize pending syncs. */
     427         952 :         pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
     428         952 :         SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
     429         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
     430             :                        pendingsyncsspace);
     431             : 
     432             :         /* Serialize reindex state. */
     433         952 :         reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
     434         952 :         SerializeReindexState(reindexlen, reindexspace);
     435         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
     436             : 
     437             :         /* Serialize relmapper state. */
     438         952 :         relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
     439         952 :         SerializeRelationMap(relmapperlen, relmapperspace);
     440         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
     441             :                        relmapperspace);
     442             : 
     443             :         /* Serialize uncommitted enum state. */
     444         952 :         uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
     445             :                                                  uncommittedenumslen);
     446         952 :         SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
     447         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
     448             :                        uncommittedenumsspace);
     449             : 
     450             :         /* Serialize our ClientConnectionInfo. */
     451         952 :         clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
     452         952 :         SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
     453         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
     454             :                        clientconninfospace);
     455             : 
     456             :         /* Allocate space for worker information. */
     457         952 :         pcxt->worker = palloc0_array(ParallelWorkerInfo, pcxt->nworkers);
     458             : 
     459             :         /*
     460             :          * Establish error queues in dynamic shared memory.
     461             :          *
     462             :          * These queues should be used only for transmitting ErrorResponse,
     463             :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
     464             :          * should be transmitted via separate (possibly larger?) queues.
     465             :          */
     466             :         error_queue_space =
     467         952 :             shm_toc_allocate(pcxt->toc,
     468             :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     469         952 :                                       pcxt->nworkers));
     470        3090 :         for (i = 0; i < pcxt->nworkers; ++i)
     471             :         {
     472             :             char       *start;
     473             :             shm_mq     *mq;
     474             : 
     475        2138 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     476        2138 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     477        2138 :             shm_mq_set_receiver(mq, MyProc);
     478        2138 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     479             :         }
     480         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
     481             : 
     482             :         /*
     483             :          * Serialize entrypoint information.  It's unsafe to pass function
     484             :          * pointers across processes, as the function pointer may be different
     485             :          * in each process in EXEC_BACKEND builds, so we always pass library
     486             :          * and function name.  (We use library name "postgres" for functions
     487             :          * in the core backend.)
     488             :          */
     489         952 :         lnamelen = strlen(pcxt->library_name);
     490         952 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
     491         952 :                                            strlen(pcxt->function_name) + 2);
     492         952 :         strcpy(entrypointstate, pcxt->library_name);
     493         952 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
     494         952 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
     495             :     }
     496             : 
     497             :     /* Update nworkers_to_launch, in case we changed nworkers above. */
     498         952 :     pcxt->nworkers_to_launch = pcxt->nworkers;
     499             : 
     500             :     /* Restore previous memory context. */
     501         952 :     MemoryContextSwitchTo(oldcontext);
     502         952 : }
     503             : 
     504             : /*
     505             :  * Reinitialize the dynamic shared memory segment for a parallel context such
     506             :  * that we could launch workers for it again.
     507             :  */
     508             : void
     509         260 : ReinitializeParallelDSM(ParallelContext *pcxt)
     510             : {
     511             :     MemoryContext oldcontext;
     512             :     FixedParallelState *fps;
     513             : 
     514             :     /* We might be running in a very short-lived memory context. */
     515         260 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     516             : 
     517             :     /* Wait for any old workers to exit. */
     518         260 :     if (pcxt->nworkers_launched > 0)
     519             :     {
     520         260 :         WaitForParallelWorkersToFinish(pcxt);
     521         260 :         WaitForParallelWorkersToExit(pcxt);
     522         260 :         pcxt->nworkers_launched = 0;
     523         260 :         if (pcxt->known_attached_workers)
     524             :         {
     525         260 :             pfree(pcxt->known_attached_workers);
     526         260 :             pcxt->known_attached_workers = NULL;
     527         260 :             pcxt->nknown_attached_workers = 0;
     528             :         }
     529             :     }
     530             : 
     531             :     /* Reset a few bits of fixed parallel state to a clean state. */
     532         260 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     533         260 :     fps->last_xlog_end = 0;
     534             : 
     535             :     /* Recreate error queues (if they exist). */
     536         260 :     if (pcxt->nworkers > 0)
     537             :     {
     538             :         char       *error_queue_space;
     539             :         int         i;
     540             : 
     541             :         error_queue_space =
     542         260 :             shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
     543        1086 :         for (i = 0; i < pcxt->nworkers; ++i)
     544             :         {
     545             :             char       *start;
     546             :             shm_mq     *mq;
     547             : 
     548         826 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     549         826 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     550         826 :             shm_mq_set_receiver(mq, MyProc);
     551         826 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     552             :         }
     553             :     }
     554             : 
     555             :     /* Restore previous memory context. */
     556         260 :     MemoryContextSwitchTo(oldcontext);
     557         260 : }
     558             : 
     559             : /*
     560             :  * Reinitialize parallel workers for a parallel context such that we could
     561             :  * launch a different number of workers.  This is required for cases where
     562             :  * we need to reuse the same DSM segment, but the number of workers can
     563             :  * vary from run-to-run.
     564             :  */
     565             : void
     566          36 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
     567             : {
     568             :     /*
     569             :      * The number of workers that need to be launched must be less than the
     570             :      * number of workers with which the parallel context is initialized.  But
     571             :      * the caller might not know that InitializeParallelDSM reduced nworkers,
     572             :      * so just silently trim the request.
     573             :      */
     574          36 :     pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
     575          36 : }
     576             : 
     577             : /*
     578             :  * Launch parallel workers.
     579             :  */
     580             : void
     581        1212 : LaunchParallelWorkers(ParallelContext *pcxt)
     582             : {
     583             :     MemoryContext oldcontext;
     584             :     BackgroundWorker worker;
     585             :     int         i;
     586        1212 :     bool        any_registrations_failed = false;
     587             : 
     588             :     /* Skip this if we have no workers. */
     589        1212 :     if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
     590           0 :         return;
     591             : 
     592             :     /* We need to be a lock group leader. */
     593        1212 :     BecomeLockGroupLeader();
     594             : 
     595             :     /* If we do have workers, we'd better have a DSM segment. */
     596             :     Assert(pcxt->seg != NULL);
     597             : 
     598             :     /* We might be running in a short-lived memory context. */
     599        1212 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     600             : 
     601             :     /* Configure a worker. */
     602        1212 :     memset(&worker, 0, sizeof(worker));
     603        1212 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
     604             :              MyProcPid);
     605        1212 :     snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
     606        1212 :     worker.bgw_flags =
     607             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
     608             :         | BGWORKER_CLASS_PARALLEL;
     609        1212 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     610        1212 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     611        1212 :     sprintf(worker.bgw_library_name, "postgres");
     612        1212 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
     613        1212 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
     614        1212 :     worker.bgw_notify_pid = MyProcPid;
     615             : 
     616             :     /*
     617             :      * Start workers.
     618             :      *
     619             :      * The caller must be able to tolerate ending up with fewer workers than
     620             :      * expected, so there is no need to throw an error here if registration
     621             :      * fails.  It wouldn't help much anyway, because registering the worker in
     622             :      * no way guarantees that it will start up and initialize successfully.
     623             :      */
     624        4174 :     for (i = 0; i < pcxt->nworkers_to_launch; ++i)
     625             :     {
     626        2962 :         memcpy(worker.bgw_extra, &i, sizeof(int));
     627        5866 :         if (!any_registrations_failed &&
     628        2904 :             RegisterDynamicBackgroundWorker(&worker,
     629        2904 :                                             &pcxt->worker[i].bgwhandle))
     630             :         {
     631        2876 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
     632        2876 :                               pcxt->worker[i].bgwhandle);
     633        2876 :             pcxt->nworkers_launched++;
     634             :         }
     635             :         else
     636             :         {
     637             :             /*
     638             :              * If we weren't able to register the worker, then we've bumped up
     639             :              * against the max_worker_processes limit, and future
     640             :              * registrations will probably fail too, so arrange to skip them.
     641             :              * But we still have to execute this code for the remaining slots
     642             :              * to make sure that we forget about the error queues we budgeted
     643             :              * for those workers.  Otherwise, we'll wait for them to start,
     644             :              * but they never will.
     645             :              */
     646          86 :             any_registrations_failed = true;
     647          86 :             pcxt->worker[i].bgwhandle = NULL;
     648          86 :             shm_mq_detach(pcxt->worker[i].error_mqh);
     649          86 :             pcxt->worker[i].error_mqh = NULL;
     650             :         }
     651             :     }
     652             : 
     653             :     /*
     654             :      * Now that nworkers_launched has taken its final value, we can initialize
     655             :      * known_attached_workers.
     656             :      */
     657        1212 :     if (pcxt->nworkers_launched > 0)
     658             :     {
     659        1192 :         pcxt->known_attached_workers = palloc0_array(bool, pcxt->nworkers_launched);
     660        1192 :         pcxt->nknown_attached_workers = 0;
     661             :     }
     662             : 
     663             :     /* Restore previous memory context. */
     664        1212 :     MemoryContextSwitchTo(oldcontext);
     665             : }
     666             : 
     667             : /*
     668             :  * Wait for all workers to attach to their error queues, and throw an error if
     669             :  * any worker fails to do this.
     670             :  *
     671             :  * Callers can assume that if this function returns successfully, then the
     672             :  * number of workers given by pcxt->nworkers_launched have initialized and
     673             :  * attached to their error queues.  Whether or not these workers are guaranteed
     674             :  * to still be running depends on what code the caller asked them to run;
     675             :  * this function does not guarantee that they have not exited.  However, it
     676             :  * does guarantee that any workers which exited must have done so cleanly and
     677             :  * after successfully performing the work with which they were tasked.
     678             :  *
     679             :  * If this function is not called, then some of the workers that were launched
     680             :  * may not have been started due to a fork() failure, or may have exited during
     681             :  * early startup prior to attaching to the error queue, so nworkers_launched
     682             :  * cannot be viewed as completely reliable.  It will never be less than the
     683             :  * number of workers which actually started, but it might be more.  Any workers
     684             :  * that failed to start will still be discovered by
     685             :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
     686             :  * provided that function is eventually reached.
     687             :  *
     688             :  * In general, the leader process should do as much work as possible before
     689             :  * calling this function.  fork() failures and other early-startup failures
     690             :  * are very uncommon, and having the leader sit idle when it could be doing
     691             :  * useful work is undesirable.  However, if the leader needs to wait for
     692             :  * all of its workers or for a specific worker, it may want to call this
     693             :  * function before doing so.  If not, it must make some other provision for
     694             :  * the failure-to-start case, lest it wait forever.  On the other hand, a
     695             :  * leader which never waits for a worker that might not be started yet, or
     696             :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
     697             :  * call this function at all.
     698             :  */
     699             : void
     700         162 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
     701             : {
     702             :     int         i;
     703             : 
     704             :     /* Skip this if we have no launched workers. */
     705         162 :     if (pcxt->nworkers_launched == 0)
     706           0 :         return;
     707             : 
     708             :     for (;;)
     709             :     {
     710             :         /*
     711             :          * This will process any parallel messages that are pending and it may
     712             :          * also throw an error propagated from a worker.
     713             :          */
     714    14452892 :         CHECK_FOR_INTERRUPTS();
     715             : 
     716    29250232 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     717             :         {
     718             :             BgwHandleStatus status;
     719             :             shm_mq     *mq;
     720             :             int         rc;
     721             :             pid_t       pid;
     722             : 
     723    14797340 :             if (pcxt->known_attached_workers[i])
     724      334218 :                 continue;
     725             : 
     726             :             /*
     727             :              * If error_mqh is NULL, then the worker has already exited
     728             :              * cleanly.
     729             :              */
     730    14463122 :             if (pcxt->worker[i].error_mqh == NULL)
     731             :             {
     732           0 :                 pcxt->known_attached_workers[i] = true;
     733           0 :                 ++pcxt->nknown_attached_workers;
     734           0 :                 continue;
     735             :             }
     736             : 
     737    14463122 :             status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
     738    14463122 :             if (status == BGWH_STARTED)
     739             :             {
     740             :                 /* Has the worker attached to the error queue? */
     741    14463006 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     742    14463006 :                 if (shm_mq_get_sender(mq) != NULL)
     743             :                 {
     744             :                     /* Yes, so it is known to be attached. */
     745         146 :                     pcxt->known_attached_workers[i] = true;
     746         146 :                     ++pcxt->nknown_attached_workers;
     747             :                 }
     748             :             }
     749         116 :             else if (status == BGWH_STOPPED)
     750             :             {
     751             :                 /*
     752             :                  * If the worker stopped without attaching to the error queue,
     753             :                  * throw an error.
     754             :                  */
     755           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     756           0 :                 if (shm_mq_get_sender(mq) == NULL)
     757           0 :                     ereport(ERROR,
     758             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     759             :                              errmsg("parallel worker failed to initialize"),
     760             :                              errhint("More details may be available in the server log.")));
     761             : 
     762           0 :                 pcxt->known_attached_workers[i] = true;
     763           0 :                 ++pcxt->nknown_attached_workers;
     764             :             }
     765             :             else
     766             :             {
     767             :                 /*
     768             :                  * Worker not yet started, so we must wait.  The postmaster
     769             :                  * will notify us if the worker's state changes.  Our latch
     770             :                  * might also get set for some other reason, but if so we'll
     771             :                  * just end up waiting for the same worker again.
     772             :                  */
     773         116 :                 rc = WaitLatch(MyLatch,
     774             :                                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     775             :                                -1, WAIT_EVENT_BGWORKER_STARTUP);
     776             : 
     777         116 :                 if (rc & WL_LATCH_SET)
     778         116 :                     ResetLatch(MyLatch);
     779             :             }
     780             :         }
     781             : 
     782             :         /* If all workers are known to have started, we're done. */
     783    14452892 :         if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
     784             :         {
     785             :             Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
     786         162 :             break;
     787             :         }
     788             :     }
     789             : }
     790             : 
     791             : /*
     792             :  * Wait for all workers to finish computing.
     793             :  *
     794             :  * Even if the parallel operation seems to have completed successfully, it's
     795             :  * important to call this function afterwards.  We must not miss any errors
     796             :  * the workers may have thrown during the parallel operation, or any that they
     797             :  * may yet throw while shutting down.
     798             :  *
     799             :  * Also, we want to update our notion of XactLastRecEnd based on worker
     800             :  * feedback.
     801             :  */
     802             : void
     803        1460 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
     804             : {
     805             :     for (;;)
     806        1258 :     {
     807        2718 :         bool        anyone_alive = false;
     808        2718 :         int         nfinished = 0;
     809             :         int         i;
     810             : 
     811             :         /*
     812             :          * This will process any parallel messages that are pending, which may
     813             :          * change the outcome of the loop that follows.  It may also throw an
     814             :          * error propagated from a worker.
     815             :          */
     816        2718 :         CHECK_FOR_INTERRUPTS();
     817             : 
     818        9536 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     819             :         {
     820             :             /*
     821             :              * If error_mqh is NULL, then the worker has already exited
     822             :              * cleanly.  If we have received a message through error_mqh from
     823             :              * the worker, we know it started up cleanly, and therefore we're
     824             :              * certain to be notified when it exits.
     825             :              */
     826        6852 :             if (pcxt->worker[i].error_mqh == NULL)
     827        5416 :                 ++nfinished;
     828        1436 :             else if (pcxt->known_attached_workers[i])
     829             :             {
     830          34 :                 anyone_alive = true;
     831          34 :                 break;
     832             :             }
     833             :         }
     834             : 
     835        2718 :         if (!anyone_alive)
     836             :         {
     837             :             /* If all workers are known to have finished, we're done. */
     838        2684 :             if (nfinished >= pcxt->nworkers_launched)
     839             :             {
     840             :                 Assert(nfinished == pcxt->nworkers_launched);
     841        1460 :                 break;
     842             :             }
     843             : 
     844             :             /*
     845             :              * We didn't detect any living workers, but not all workers are
     846             :              * known to have exited cleanly.  Either not all workers have
     847             :              * launched yet, or maybe some of them failed to start or
     848             :              * terminated abnormally.
     849             :              */
     850        4358 :             for (i = 0; i < pcxt->nworkers_launched; ++i)
     851             :             {
     852             :                 pid_t       pid;
     853             :                 shm_mq     *mq;
     854             : 
     855             :                 /*
     856             :                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
     857             :                  * should just keep waiting.  If it is BGWH_STOPPED, then
     858             :                  * further investigation is needed.
     859             :                  */
     860        3134 :                 if (pcxt->worker[i].error_mqh == NULL ||
     861        2804 :                     pcxt->worker[i].bgwhandle == NULL ||
     862        1402 :                     GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
     863             :                                            &pid) != BGWH_STOPPED)
     864        3134 :                     continue;
     865             : 
     866             :                 /*
     867             :                  * Check whether the worker ended up stopped without ever
     868             :                  * attaching to the error queue.  If so, the postmaster was
     869             :                  * unable to fork the worker or it exited without initializing
     870             :                  * properly.  We must throw an error, since the caller may
     871             :                  * have been expecting the worker to do some work before
     872             :                  * exiting.
     873             :                  */
     874           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     875           0 :                 if (shm_mq_get_sender(mq) == NULL)
     876           0 :                     ereport(ERROR,
     877             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     878             :                              errmsg("parallel worker failed to initialize"),
     879             :                              errhint("More details may be available in the server log.")));
     880             : 
     881             :                 /*
     882             :                  * The worker is stopped, but is attached to the error queue.
     883             :                  * Unless there's a bug somewhere, this will only happen when
     884             :                  * the worker writes messages and terminates after the
     885             :                  * CHECK_FOR_INTERRUPTS() near the top of this function and
     886             :                  * before the call to GetBackgroundWorkerPid().  In that case,
     887             :                  * or latch should have been set as well and the right things
     888             :                  * will happen on the next pass through the loop.
     889             :                  */
     890             :             }
     891             :         }
     892             : 
     893        1258 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     894             :                          WAIT_EVENT_PARALLEL_FINISH);
     895        1258 :         ResetLatch(MyLatch);
     896             :     }
     897             : 
     898        1460 :     if (pcxt->toc != NULL)
     899             :     {
     900             :         FixedParallelState *fps;
     901             : 
     902        1460 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     903        1460 :         if (fps->last_xlog_end > XactLastRecEnd)
     904          18 :             XactLastRecEnd = fps->last_xlog_end;
     905             :     }
     906        1460 : }
     907             : 
     908             : /*
     909             :  * Wait for all workers to exit.
     910             :  *
     911             :  * This function ensures that workers have been completely shutdown.  The
     912             :  * difference between WaitForParallelWorkersToFinish and this function is
     913             :  * that the former just ensures that last message sent by a worker backend is
     914             :  * received by the leader backend whereas this ensures the complete shutdown.
     915             :  */
     916             : static void
     917        1212 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
     918             : {
     919             :     int         i;
     920             : 
     921             :     /* Wait until the workers actually die. */
     922        4088 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
     923             :     {
     924             :         BgwHandleStatus status;
     925             : 
     926        2876 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
     927           0 :             continue;
     928             : 
     929        2876 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
     930             : 
     931             :         /*
     932             :          * If the postmaster kicked the bucket, we have no chance of cleaning
     933             :          * up safely -- we won't be able to tell when our workers are actually
     934             :          * dead.  This doesn't necessitate a PANIC since they will all abort
     935             :          * eventually, but we can't safely continue this session.
     936             :          */
     937        2876 :         if (status == BGWH_POSTMASTER_DIED)
     938           0 :             ereport(FATAL,
     939             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     940             :                      errmsg("postmaster exited during a parallel transaction")));
     941             : 
     942             :         /* Release memory. */
     943        2876 :         pfree(pcxt->worker[i].bgwhandle);
     944        2876 :         pcxt->worker[i].bgwhandle = NULL;
     945             :     }
     946        1212 : }
     947             : 
     948             : /*
     949             :  * Destroy a parallel context.
     950             :  *
     951             :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
     952             :  * first, before calling this function.  When this function is invoked, any
     953             :  * remaining workers are forcibly killed; the dynamic shared memory segment
     954             :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
     955             :  */
     956             : void
     957         952 : DestroyParallelContext(ParallelContext *pcxt)
     958             : {
     959             :     int         i;
     960             : 
     961             :     /*
     962             :      * Be careful about order of operations here!  We remove the parallel
     963             :      * context from the list before we do anything else; otherwise, if an
     964             :      * error occurs during a subsequent step, we might try to nuke it again
     965             :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
     966             :      */
     967         952 :     dlist_delete(&pcxt->node);
     968             : 
     969             :     /* Kill each worker in turn, and forget their error queues. */
     970         952 :     if (pcxt->worker != NULL)
     971             :     {
     972        3008 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     973             :         {
     974        2056 :             if (pcxt->worker[i].error_mqh != NULL)
     975             :             {
     976          12 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
     977             : 
     978          12 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
     979          12 :                 pcxt->worker[i].error_mqh = NULL;
     980             :             }
     981             :         }
     982             :     }
     983             : 
     984             :     /*
     985             :      * If we have allocated a shared memory segment, detach it.  This will
     986             :      * implicitly detach the error queues, and any other shared memory queues,
     987             :      * stored there.
     988             :      */
     989         952 :     if (pcxt->seg != NULL)
     990             :     {
     991         952 :         dsm_detach(pcxt->seg);
     992         952 :         pcxt->seg = NULL;
     993             :     }
     994             : 
     995             :     /*
     996             :      * If this parallel context is actually in backend-private memory rather
     997             :      * than shared memory, free that memory instead.
     998             :      */
     999         952 :     if (pcxt->private_memory != NULL)
    1000             :     {
    1001           0 :         pfree(pcxt->private_memory);
    1002           0 :         pcxt->private_memory = NULL;
    1003             :     }
    1004             : 
    1005             :     /*
    1006             :      * We can't finish transaction commit or abort until all of the workers
    1007             :      * have exited.  This means, in particular, that we can't respond to
    1008             :      * interrupts at this stage.
    1009             :      */
    1010         952 :     HOLD_INTERRUPTS();
    1011         952 :     WaitForParallelWorkersToExit(pcxt);
    1012         952 :     RESUME_INTERRUPTS();
    1013             : 
    1014             :     /* Free the worker array itself. */
    1015         952 :     if (pcxt->worker != NULL)
    1016             :     {
    1017         952 :         pfree(pcxt->worker);
    1018         952 :         pcxt->worker = NULL;
    1019             :     }
    1020             : 
    1021             :     /* Free memory. */
    1022         952 :     pfree(pcxt->library_name);
    1023         952 :     pfree(pcxt->function_name);
    1024         952 :     pfree(pcxt);
    1025         952 : }
    1026             : 
    1027             : /*
    1028             :  * Are there any parallel contexts currently active?
    1029             :  */
    1030             : bool
    1031           0 : ParallelContextActive(void)
    1032             : {
    1033           0 :     return !dlist_is_empty(&pcxt_list);
    1034             : }
    1035             : 
    1036             : /*
    1037             :  * Handle receipt of an interrupt indicating a parallel worker message.
    1038             :  *
    1039             :  * Note: this is called within a signal handler!  All we can do is set
    1040             :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
    1041             :  * ProcessParallelMessages().
    1042             :  */
    1043             : void
    1044        3458 : HandleParallelMessageInterrupt(void)
    1045             : {
    1046        3458 :     InterruptPending = true;
    1047        3458 :     ParallelMessagePending = true;
    1048        3458 :     SetLatch(MyLatch);
    1049        3458 : }
    1050             : 
    1051             : /*
    1052             :  * Process any queued protocol messages received from parallel workers.
    1053             :  */
    1054             : void
    1055        3340 : ProcessParallelMessages(void)
    1056             : {
    1057             :     dlist_iter  iter;
    1058             :     MemoryContext oldcontext;
    1059             : 
    1060             :     static MemoryContext hpm_context = NULL;
    1061             : 
    1062             :     /*
    1063             :      * This is invoked from ProcessInterrupts(), and since some of the
    1064             :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1065             :      * for recursive calls if more signals are received while this runs.  It's
    1066             :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1067             :      * even if it is safe, so let's block interrupts until done.
    1068             :      */
    1069        3340 :     HOLD_INTERRUPTS();
    1070             : 
    1071             :     /*
    1072             :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
    1073             :      * don't want to risk leaking data into long-lived contexts, so let's do
    1074             :      * our work here in a private context that we can reset on each use.
    1075             :      */
    1076        3340 :     if (hpm_context == NULL)    /* first time through? */
    1077         150 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
    1078             :                                             "ProcessParallelMessages",
    1079             :                                             ALLOCSET_DEFAULT_SIZES);
    1080             :     else
    1081        3190 :         MemoryContextReset(hpm_context);
    1082             : 
    1083        3340 :     oldcontext = MemoryContextSwitchTo(hpm_context);
    1084             : 
    1085             :     /* OK to process messages.  Reset the flag saying there are more to do. */
    1086        3340 :     ParallelMessagePending = false;
    1087             : 
    1088        6790 :     dlist_foreach(iter, &pcxt_list)
    1089             :     {
    1090             :         ParallelContext *pcxt;
    1091             :         int         i;
    1092             : 
    1093        3462 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
    1094        3462 :         if (pcxt->worker == NULL)
    1095           0 :             continue;
    1096             : 
    1097       14166 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
    1098             :         {
    1099             :             /*
    1100             :              * Read as many messages as we can from each worker, but stop when
    1101             :              * either (1) the worker's error queue goes away, which can happen
    1102             :              * if we receive a Terminate message from the worker; or (2) no
    1103             :              * more messages can be read from the worker without blocking.
    1104             :              */
    1105       13580 :             while (pcxt->worker[i].error_mqh != NULL)
    1106             :             {
    1107             :                 shm_mq_result res;
    1108             :                 Size        nbytes;
    1109             :                 void       *data;
    1110             : 
    1111        6130 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
    1112             :                                      &data, true);
    1113        6130 :                 if (res == SHM_MQ_WOULD_BLOCK)
    1114        3254 :                     break;
    1115        2876 :                 else if (res == SHM_MQ_SUCCESS)
    1116             :                 {
    1117             :                     StringInfoData msg;
    1118             : 
    1119        2876 :                     initStringInfo(&msg);
    1120        2876 :                     appendBinaryStringInfo(&msg, data, nbytes);
    1121        2876 :                     ProcessParallelMessage(pcxt, i, &msg);
    1122        2864 :                     pfree(msg.data);
    1123             :                 }
    1124             :                 else
    1125           0 :                     ereport(ERROR,
    1126             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1127             :                              errmsg("lost connection to parallel worker")));
    1128             :             }
    1129             :         }
    1130             :     }
    1131             : 
    1132        3328 :     MemoryContextSwitchTo(oldcontext);
    1133             : 
    1134             :     /* Might as well clear the context on our way out */
    1135        3328 :     MemoryContextReset(hpm_context);
    1136             : 
    1137        3328 :     RESUME_INTERRUPTS();
    1138        3328 : }
    1139             : 
    1140             : /*
    1141             :  * Process a single protocol message received from a single parallel worker.
    1142             :  */
    1143             : static void
    1144        2876 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
    1145             : {
    1146             :     char        msgtype;
    1147             : 
    1148        2876 :     if (pcxt->known_attached_workers != NULL &&
    1149        2876 :         !pcxt->known_attached_workers[i])
    1150             :     {
    1151        2730 :         pcxt->known_attached_workers[i] = true;
    1152        2730 :         pcxt->nknown_attached_workers++;
    1153             :     }
    1154             : 
    1155        2876 :     msgtype = pq_getmsgbyte(msg);
    1156             : 
    1157        2876 :     switch (msgtype)
    1158             :     {
    1159          12 :         case PqMsg_ErrorResponse:
    1160             :         case PqMsg_NoticeResponse:
    1161             :             {
    1162             :                 ErrorData   edata;
    1163             :                 ErrorContextCallback *save_error_context_stack;
    1164             : 
    1165             :                 /* Parse ErrorResponse or NoticeResponse. */
    1166          12 :                 pq_parse_errornotice(msg, &edata);
    1167             : 
    1168             :                 /* Death of a worker isn't enough justification for suicide. */
    1169          12 :                 edata.elevel = Min(edata.elevel, ERROR);
    1170             : 
    1171             :                 /*
    1172             :                  * If desired, add a context line to show that this is a
    1173             :                  * message propagated from a parallel worker.  Otherwise, it
    1174             :                  * can sometimes be confusing to understand what actually
    1175             :                  * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
    1176             :                  * because it causes test-result instability depending on
    1177             :                  * whether a parallel worker is actually used or not.)
    1178             :                  */
    1179          12 :                 if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
    1180             :                 {
    1181          12 :                     if (edata.context)
    1182           6 :                         edata.context = psprintf("%s\n%s", edata.context,
    1183             :                                                  _("parallel worker"));
    1184             :                     else
    1185           6 :                         edata.context = pstrdup(_("parallel worker"));
    1186             :                 }
    1187             : 
    1188             :                 /*
    1189             :                  * Context beyond that should use the error context callbacks
    1190             :                  * that were in effect when the ParallelContext was created,
    1191             :                  * not the current ones.
    1192             :                  */
    1193          12 :                 save_error_context_stack = error_context_stack;
    1194          12 :                 error_context_stack = pcxt->error_context_stack;
    1195             : 
    1196             :                 /* Rethrow error or print notice. */
    1197          12 :                 ThrowErrorData(&edata);
    1198             : 
    1199             :                 /* Not an error, so restore previous context stack. */
    1200           0 :                 error_context_stack = save_error_context_stack;
    1201             : 
    1202           0 :                 break;
    1203             :             }
    1204             : 
    1205           0 :         case PqMsg_NotificationResponse:
    1206             :             {
    1207             :                 /* Propagate NotifyResponse. */
    1208             :                 int32       pid;
    1209             :                 const char *channel;
    1210             :                 const char *payload;
    1211             : 
    1212           0 :                 pid = pq_getmsgint(msg, 4);
    1213           0 :                 channel = pq_getmsgrawstring(msg);
    1214           0 :                 payload = pq_getmsgrawstring(msg);
    1215           0 :                 pq_endmessage(msg);
    1216             : 
    1217           0 :                 NotifyMyFrontEnd(channel, payload, pid);
    1218             : 
    1219           0 :                 break;
    1220             :             }
    1221             : 
    1222           0 :         case PqMsg_Progress:
    1223             :             {
    1224             :                 /*
    1225             :                  * Only incremental progress reporting is currently supported.
    1226             :                  * However, it's possible to add more fields to the message to
    1227             :                  * allow for handling of other backend progress APIs.
    1228             :                  */
    1229           0 :                 int         index = pq_getmsgint(msg, 4);
    1230           0 :                 int64       incr = pq_getmsgint64(msg);
    1231             : 
    1232           0 :                 pq_getmsgend(msg);
    1233             : 
    1234           0 :                 pgstat_progress_incr_param(index, incr);
    1235             : 
    1236           0 :                 break;
    1237             :             }
    1238             : 
    1239        2864 :         case PqMsg_Terminate:
    1240             :             {
    1241        2864 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
    1242        2864 :                 pcxt->worker[i].error_mqh = NULL;
    1243        2864 :                 break;
    1244             :             }
    1245             : 
    1246           0 :         default:
    1247             :             {
    1248           0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
    1249             :                      msgtype, msg->len);
    1250             :             }
    1251             :     }
    1252        2864 : }
    1253             : 
    1254             : /*
    1255             :  * End-of-subtransaction cleanup for parallel contexts.
    1256             :  *
    1257             :  * Here we remove only parallel contexts initiated within the current
    1258             :  * subtransaction.
    1259             :  */
    1260             : void
    1261       20154 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
    1262             : {
    1263       20160 :     while (!dlist_is_empty(&pcxt_list))
    1264             :     {
    1265             :         ParallelContext *pcxt;
    1266             : 
    1267           6 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1268           6 :         if (pcxt->subid != mySubId)
    1269           0 :             break;
    1270           6 :         if (isCommit)
    1271           0 :             elog(WARNING, "leaked parallel context");
    1272           6 :         DestroyParallelContext(pcxt);
    1273             :     }
    1274       20154 : }
    1275             : 
    1276             : /*
    1277             :  * End-of-transaction cleanup for parallel contexts.
    1278             :  *
    1279             :  * We nuke all remaining parallel contexts.
    1280             :  */
    1281             : void
    1282     1054458 : AtEOXact_Parallel(bool isCommit)
    1283             : {
    1284     1054464 :     while (!dlist_is_empty(&pcxt_list))
    1285             :     {
    1286             :         ParallelContext *pcxt;
    1287             : 
    1288           6 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1289           6 :         if (isCommit)
    1290           0 :             elog(WARNING, "leaked parallel context");
    1291           6 :         DestroyParallelContext(pcxt);
    1292             :     }
    1293     1054458 : }
    1294             : 
    1295             : /*
    1296             :  * Main entrypoint for parallel workers.
    1297             :  */
    1298             : void
    1299        2876 : ParallelWorkerMain(Datum main_arg)
    1300             : {
    1301             :     dsm_segment *seg;
    1302             :     shm_toc    *toc;
    1303             :     FixedParallelState *fps;
    1304             :     char       *error_queue_space;
    1305             :     shm_mq     *mq;
    1306             :     shm_mq_handle *mqh;
    1307             :     char       *libraryspace;
    1308             :     char       *entrypointstate;
    1309             :     char       *library_name;
    1310             :     char       *function_name;
    1311             :     parallel_worker_main_type entrypt;
    1312             :     char       *gucspace;
    1313             :     char       *combocidspace;
    1314             :     char       *tsnapspace;
    1315             :     char       *asnapspace;
    1316             :     char       *tstatespace;
    1317             :     char       *pendingsyncsspace;
    1318             :     char       *reindexspace;
    1319             :     char       *relmapperspace;
    1320             :     char       *uncommittedenumsspace;
    1321             :     char       *clientconninfospace;
    1322             :     char       *session_dsm_handle_space;
    1323             :     Snapshot    tsnapshot;
    1324             :     Snapshot    asnapshot;
    1325             : 
    1326             :     /* Set flag to indicate that we're initializing a parallel worker. */
    1327        2876 :     InitializingParallelWorker = true;
    1328             : 
    1329             :     /* Establish signal handlers. */
    1330        2876 :     pqsignal(SIGTERM, die);
    1331        2876 :     BackgroundWorkerUnblockSignals();
    1332             : 
    1333             :     /* Determine and set our parallel worker number. */
    1334             :     Assert(ParallelWorkerNumber == -1);
    1335        2876 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
    1336             : 
    1337             :     /* Set up a memory context to work in, just for cleanliness. */
    1338        2876 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
    1339             :                                                  "Parallel worker",
    1340             :                                                  ALLOCSET_DEFAULT_SIZES);
    1341             : 
    1342             :     /*
    1343             :      * Attach to the dynamic shared memory segment for the parallel query, and
    1344             :      * find its table of contents.
    1345             :      *
    1346             :      * Note: at this point, we have not created any ResourceOwner in this
    1347             :      * process.  This will result in our DSM mapping surviving until process
    1348             :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
    1349             :      * ownership of the mapping, but we have no need for that.
    1350             :      */
    1351        2876 :     seg = dsm_attach(DatumGetUInt32(main_arg));
    1352        2876 :     if (seg == NULL)
    1353           0 :         ereport(ERROR,
    1354             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1355             :                  errmsg("could not map dynamic shared memory segment")));
    1356        2876 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
    1357        2876 :     if (toc == NULL)
    1358           0 :         ereport(ERROR,
    1359             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1360             :                  errmsg("invalid magic number in dynamic shared memory segment")));
    1361             : 
    1362             :     /* Look up fixed parallel state. */
    1363        2876 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
    1364        2876 :     MyFixedParallelState = fps;
    1365             : 
    1366             :     /* Arrange to signal the leader if we exit. */
    1367        2876 :     ParallelLeaderPid = fps->parallel_leader_pid;
    1368        2876 :     ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
    1369        2876 :     before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
    1370             : 
    1371             :     /*
    1372             :      * Now we can find and attach to the error queue provided for us.  That's
    1373             :      * good, because until we do that, any errors that happen here will not be
    1374             :      * reported back to the process that requested that this worker be
    1375             :      * launched.
    1376             :      */
    1377        2876 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
    1378        2876 :     mq = (shm_mq *) (error_queue_space +
    1379        2876 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
    1380        2876 :     shm_mq_set_sender(mq, MyProc);
    1381        2876 :     mqh = shm_mq_attach(mq, seg, NULL);
    1382        2876 :     pq_redirect_to_shm_mq(seg, mqh);
    1383        2876 :     pq_set_parallel_leader(fps->parallel_leader_pid,
    1384             :                            fps->parallel_leader_proc_number);
    1385             : 
    1386             :     /*
    1387             :      * Hooray! Primary initialization is complete.  Now, we need to set up our
    1388             :      * backend-local state to match the original backend.
    1389             :      */
    1390             : 
    1391             :     /*
    1392             :      * Join locking group.  We must do this before anything that could try to
    1393             :      * acquire a heavyweight lock, because any heavyweight locks acquired to
    1394             :      * this point could block either directly against the parallel group
    1395             :      * leader or against some process which in turn waits for a lock that
    1396             :      * conflicts with the parallel group leader, causing an undetected
    1397             :      * deadlock.  (If we can't join the lock group, the leader has gone away,
    1398             :      * so just exit quietly.)
    1399             :      */
    1400        2876 :     if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
    1401             :                                fps->parallel_leader_pid))
    1402           0 :         return;
    1403             : 
    1404             :     /*
    1405             :      * Restore transaction and statement start-time timestamps.  This must
    1406             :      * happen before anything that would start a transaction, else asserts in
    1407             :      * xact.c will fire.
    1408             :      */
    1409        2876 :     SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
    1410             : 
    1411             :     /*
    1412             :      * Identify the entry point to be called.  In theory this could result in
    1413             :      * loading an additional library, though most likely the entry point is in
    1414             :      * the core backend or in a library we just loaded.
    1415             :      */
    1416        2876 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
    1417        2876 :     library_name = entrypointstate;
    1418        2876 :     function_name = entrypointstate + strlen(library_name) + 1;
    1419             : 
    1420        2876 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
    1421             : 
    1422             :     /*
    1423             :      * Restore current session authorization and role id.  No verification
    1424             :      * happens here, we just blindly adopt the leader's state.  Note that this
    1425             :      * has to happen before InitPostgres, since InitializeSessionUserId will
    1426             :      * not set these variables.
    1427             :      */
    1428        2876 :     SetAuthenticatedUserId(fps->authenticated_user_id);
    1429        2876 :     SetSessionAuthorization(fps->session_user_id,
    1430        2876 :                             fps->session_user_is_superuser);
    1431        2876 :     SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
    1432             : 
    1433             :     /*
    1434             :      * Restore database connection.  We skip connection authorization checks,
    1435             :      * reasoning that (a) the leader checked these things when it started, and
    1436             :      * (b) we do not want parallel mode to cause these failures, because that
    1437             :      * would make use of parallel query plans not transparent to applications.
    1438             :      */
    1439        2876 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
    1440             :                                               fps->authenticated_user_id,
    1441             :                                               BGWORKER_BYPASS_ALLOWCONN |
    1442             :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
    1443             : 
    1444             :     /*
    1445             :      * Set the client encoding to the database encoding, since that is what
    1446             :      * the leader will expect.  (We're cheating a bit by not calling
    1447             :      * PrepareClientEncoding first.  It's okay because this call will always
    1448             :      * result in installing a no-op conversion.  No error should be possible,
    1449             :      * but check anyway.)
    1450             :      */
    1451        2876 :     if (SetClientEncoding(GetDatabaseEncoding()) < 0)
    1452           0 :         elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
    1453             : 
    1454             :     /*
    1455             :      * Load libraries that were loaded by original backend.  We want to do
    1456             :      * this before restoring GUCs, because the libraries might define custom
    1457             :      * variables.
    1458             :      */
    1459        2876 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
    1460        2876 :     StartTransactionCommand();
    1461        2876 :     RestoreLibraryState(libraryspace);
    1462        2876 :     CommitTransactionCommand();
    1463             : 
    1464             :     /* Crank up a transaction state appropriate to a parallel worker. */
    1465        2876 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
    1466        2876 :     StartParallelWorkerTransaction(tstatespace);
    1467             : 
    1468             :     /*
    1469             :      * Restore state that affects catalog access.  Ideally we'd do this even
    1470             :      * before calling InitPostgres, but that has order-of-initialization
    1471             :      * problems, and also the relmapper would get confused during the
    1472             :      * CommitTransactionCommand call above.
    1473             :      */
    1474        2876 :     pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
    1475             :                                        false);
    1476        2876 :     RestorePendingSyncs(pendingsyncsspace);
    1477        2876 :     relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
    1478        2876 :     RestoreRelationMap(relmapperspace);
    1479        2876 :     reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
    1480        2876 :     RestoreReindexState(reindexspace);
    1481        2876 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
    1482        2876 :     RestoreComboCIDState(combocidspace);
    1483             : 
    1484             :     /* Attach to the per-session DSM segment and contained objects. */
    1485             :     session_dsm_handle_space =
    1486        2876 :         shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
    1487        2876 :     AttachSession(*(dsm_handle *) session_dsm_handle_space);
    1488             : 
    1489             :     /*
    1490             :      * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
    1491             :      * the leader has serialized the transaction snapshot and we must restore
    1492             :      * it. At lower isolation levels, there is no transaction-lifetime
    1493             :      * snapshot, but we need TransactionXmin to get set to a value which is
    1494             :      * less than or equal to the xmin of every snapshot that will be used by
    1495             :      * this worker. The easiest way to accomplish that is to install the
    1496             :      * active snapshot as the transaction snapshot. Code running in this
    1497             :      * parallel worker might take new snapshots via GetTransactionSnapshot()
    1498             :      * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
    1499             :      * snapshot older than the active snapshot.
    1500             :      */
    1501        2876 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
    1502        2876 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
    1503        2876 :     asnapshot = RestoreSnapshot(asnapspace);
    1504        2876 :     tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
    1505        2876 :     RestoreTransactionSnapshot(tsnapshot,
    1506        2876 :                                fps->parallel_leader_pgproc);
    1507        2876 :     PushActiveSnapshot(asnapshot);
    1508             : 
    1509             :     /*
    1510             :      * We've changed which tuples we can see, and must therefore invalidate
    1511             :      * system caches.
    1512             :      */
    1513        2876 :     InvalidateSystemCaches();
    1514             : 
    1515             :     /*
    1516             :      * Restore GUC values from launching backend.  We can't do this earlier,
    1517             :      * because GUC check hooks that do catalog lookups need to see the same
    1518             :      * database state as the leader.  Also, the check hooks for
    1519             :      * session_authorization and role assume we already set the correct role
    1520             :      * OIDs.
    1521             :      */
    1522        2876 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
    1523        2876 :     RestoreGUCState(gucspace);
    1524             : 
    1525             :     /*
    1526             :      * Restore current user ID and security context.  No verification happens
    1527             :      * here, we just blindly adopt the leader's state.  We can't do this till
    1528             :      * after restoring GUCs, else we'll get complaints about restoring
    1529             :      * session_authorization and role.  (In effect, we're assuming that all
    1530             :      * the restored values are okay to set, even if we are now inside a
    1531             :      * restricted context.)
    1532             :      */
    1533        2876 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
    1534             : 
    1535             :     /* Restore temp-namespace state to ensure search path matches leader's. */
    1536        2876 :     SetTempNamespaceState(fps->temp_namespace_id,
    1537             :                           fps->temp_toast_namespace_id);
    1538             : 
    1539             :     /* Restore uncommitted enums. */
    1540        2876 :     uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
    1541             :                                            false);
    1542        2876 :     RestoreUncommittedEnums(uncommittedenumsspace);
    1543             : 
    1544             :     /* Restore the ClientConnectionInfo. */
    1545        2876 :     clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
    1546             :                                          false);
    1547        2876 :     RestoreClientConnectionInfo(clientconninfospace);
    1548             : 
    1549             :     /*
    1550             :      * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
    1551             :      * ensure that auth_method is actually valid, aka authn_id is not NULL.
    1552             :      */
    1553        2876 :     if (MyClientConnectionInfo.authn_id)
    1554           4 :         InitializeSystemUser(MyClientConnectionInfo.authn_id,
    1555             :                              hba_authname(MyClientConnectionInfo.auth_method));
    1556             : 
    1557             :     /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
    1558        2876 :     AttachSerializableXact(fps->serializable_xact_handle);
    1559             : 
    1560             :     /*
    1561             :      * We've initialized all of our state now; nothing should change
    1562             :      * hereafter.
    1563             :      */
    1564        2876 :     InitializingParallelWorker = false;
    1565        2876 :     EnterParallelMode();
    1566             : 
    1567             :     /*
    1568             :      * Time to do the real work: invoke the caller-supplied code.
    1569             :      */
    1570        2876 :     entrypt(seg, toc);
    1571             : 
    1572             :     /* Must exit parallel mode to pop active snapshot. */
    1573        2864 :     ExitParallelMode();
    1574             : 
    1575             :     /* Must pop active snapshot so snapmgr.c doesn't complain. */
    1576        2864 :     PopActiveSnapshot();
    1577             : 
    1578             :     /* Shut down the parallel-worker transaction. */
    1579        2864 :     EndParallelWorkerTransaction();
    1580             : 
    1581             :     /* Detach from the per-session DSM segment. */
    1582        2864 :     DetachSession();
    1583             : 
    1584             :     /* Report success. */
    1585        2864 :     pq_putmessage(PqMsg_Terminate, NULL, 0);
    1586             : }
    1587             : 
    1588             : /*
    1589             :  * Update shared memory with the ending location of the last WAL record we
    1590             :  * wrote, if it's greater than the value already stored there.
    1591             :  */
    1592             : void
    1593        2864 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
    1594             : {
    1595        2864 :     FixedParallelState *fps = MyFixedParallelState;
    1596             : 
    1597             :     Assert(fps != NULL);
    1598        2864 :     SpinLockAcquire(&fps->mutex);
    1599        2864 :     if (fps->last_xlog_end < last_xlog_end)
    1600         184 :         fps->last_xlog_end = last_xlog_end;
    1601        2864 :     SpinLockRelease(&fps->mutex);
    1602        2864 : }
    1603             : 
    1604             : /*
    1605             :  * Make sure the leader tries to read from our error queue one more time.
    1606             :  * This guards against the case where we exit uncleanly without sending an
    1607             :  * ErrorResponse to the leader, for example because some code calls proc_exit
    1608             :  * directly.
    1609             :  *
    1610             :  * Also explicitly detach from dsm segment so that subsystems using
    1611             :  * on_dsm_detach() have a chance to send stats before the stats subsystem is
    1612             :  * shut down as part of a before_shmem_exit() hook.
    1613             :  *
    1614             :  * One might think this could instead be solved by carefully ordering the
    1615             :  * attaching to dsm segments, so that the pgstats segments get detached from
    1616             :  * later than the parallel query one. That turns out to not work because the
    1617             :  * stats hash might need to grow which can cause new segments to be allocated,
    1618             :  * which then will be detached from earlier.
    1619             :  */
    1620             : static void
    1621        2876 : ParallelWorkerShutdown(int code, Datum arg)
    1622             : {
    1623        2876 :     SendProcSignal(ParallelLeaderPid,
    1624             :                    PROCSIG_PARALLEL_MESSAGE,
    1625             :                    ParallelLeaderProcNumber);
    1626             : 
    1627        2876 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
    1628        2876 : }
    1629             : 
    1630             : /*
    1631             :  * Look up (and possibly load) a parallel worker entry point function.
    1632             :  *
    1633             :  * For functions contained in the core code, we use library name "postgres"
    1634             :  * and consult the InternalParallelWorkers array.  External functions are
    1635             :  * looked up, and loaded if necessary, using load_external_function().
    1636             :  *
    1637             :  * The point of this is to pass function names as strings across process
    1638             :  * boundaries.  We can't pass actual function addresses because of the
    1639             :  * possibility that the function has been loaded at a different address
    1640             :  * in a different process.  This is obviously a hazard for functions in
    1641             :  * loadable libraries, but it can happen even for functions in the core code
    1642             :  * on platforms using EXEC_BACKEND (e.g., Windows).
    1643             :  *
    1644             :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
    1645             :  * in favor of applying load_external_function() for core functions too;
    1646             :  * but that raises portability issues that are not worth addressing now.
    1647             :  */
    1648             : static parallel_worker_main_type
    1649        2876 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
    1650             : {
    1651             :     /*
    1652             :      * If the function is to be loaded from postgres itself, search the
    1653             :      * InternalParallelWorkers array.
    1654             :      */
    1655        2876 :     if (strcmp(libraryname, "postgres") == 0)
    1656             :     {
    1657             :         int         i;
    1658             : 
    1659        3246 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
    1660             :         {
    1661        3246 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
    1662        2876 :                 return InternalParallelWorkers[i].fn_addr;
    1663             :         }
    1664             : 
    1665             :         /* We can only reach this by programming error. */
    1666           0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
    1667             :     }
    1668             : 
    1669             :     /* Otherwise load from external library. */
    1670           0 :     return (parallel_worker_main_type)
    1671           0 :         load_external_function(libraryname, funcname, true, NULL);
    1672             : }

Generated by: LCOV version 1.16