LCOV - code coverage report
Current view: top level - src/backend/access/transam - parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 398 449 88.6 %
Date: 2019-11-21 12:06:29 Functions: 17 18 94.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * parallel.c
       4             :  *    Infrastructure for launching parallel workers
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/access/transam/parallel.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/nbtree.h"
      18             : #include "access/parallel.h"
      19             : #include "access/session.h"
      20             : #include "access/xact.h"
      21             : #include "access/xlog.h"
      22             : #include "catalog/index.h"
      23             : #include "catalog/namespace.h"
      24             : #include "catalog/pg_enum.h"
      25             : #include "commands/async.h"
      26             : #include "executor/execParallel.h"
      27             : #include "libpq/libpq.h"
      28             : #include "libpq/pqformat.h"
      29             : #include "libpq/pqmq.h"
      30             : #include "miscadmin.h"
      31             : #include "optimizer/optimizer.h"
      32             : #include "pgstat.h"
      33             : #include "storage/ipc.h"
      34             : #include "storage/predicate.h"
      35             : #include "storage/sinval.h"
      36             : #include "storage/spin.h"
      37             : #include "tcop/tcopprot.h"
      38             : #include "utils/combocid.h"
      39             : #include "utils/guc.h"
      40             : #include "utils/inval.h"
      41             : #include "utils/memutils.h"
      42             : #include "utils/relmapper.h"
      43             : #include "utils/snapmgr.h"
      44             : #include "utils/typcache.h"
      45             : 
      46             : /*
      47             :  * We don't want to waste a lot of memory on an error queue which, most of
      48             :  * the time, will process only a handful of small messages.  However, it is
      49             :  * desirable to make it large enough that a typical ErrorResponse can be sent
      50             :  * without blocking.  That way, a worker that errors out can write the whole
      51             :  * message into the queue and terminate without waiting for the user backend.
      52             :  */
      53             : #define PARALLEL_ERROR_QUEUE_SIZE           16384
      54             : 
      55             : /* Magic number for parallel context TOC. */
      56             : #define PARALLEL_MAGIC                      0x50477c7c
      57             : 
      58             : /*
      59             :  * Magic numbers for per-context parallel state sharing.  Higher-level code
      60             :  * should use smaller values, leaving these very large ones for use by this
      61             :  * module.
      62             :  */
      63             : #define PARALLEL_KEY_FIXED                  UINT64CONST(0xFFFFFFFFFFFF0001)
      64             : #define PARALLEL_KEY_ERROR_QUEUE            UINT64CONST(0xFFFFFFFFFFFF0002)
      65             : #define PARALLEL_KEY_LIBRARY                UINT64CONST(0xFFFFFFFFFFFF0003)
      66             : #define PARALLEL_KEY_GUC                    UINT64CONST(0xFFFFFFFFFFFF0004)
      67             : #define PARALLEL_KEY_COMBO_CID              UINT64CONST(0xFFFFFFFFFFFF0005)
      68             : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT   UINT64CONST(0xFFFFFFFFFFFF0006)
      69             : #define PARALLEL_KEY_ACTIVE_SNAPSHOT        UINT64CONST(0xFFFFFFFFFFFF0007)
      70             : #define PARALLEL_KEY_TRANSACTION_STATE      UINT64CONST(0xFFFFFFFFFFFF0008)
      71             : #define PARALLEL_KEY_ENTRYPOINT             UINT64CONST(0xFFFFFFFFFFFF0009)
      72             : #define PARALLEL_KEY_SESSION_DSM            UINT64CONST(0xFFFFFFFFFFFF000A)
      73             : #define PARALLEL_KEY_REINDEX_STATE          UINT64CONST(0xFFFFFFFFFFFF000B)
      74             : #define PARALLEL_KEY_RELMAPPER_STATE        UINT64CONST(0xFFFFFFFFFFFF000C)
      75             : #define PARALLEL_KEY_ENUMBLACKLIST          UINT64CONST(0xFFFFFFFFFFFF000D)
      76             : 
      77             : /* Fixed-size parallel state. */
      78             : typedef struct FixedParallelState
      79             : {
      80             :     /* Fixed-size state that workers must restore. */
      81             :     Oid         database_id;
      82             :     Oid         authenticated_user_id;
      83             :     Oid         current_user_id;
      84             :     Oid         outer_user_id;
      85             :     Oid         temp_namespace_id;
      86             :     Oid         temp_toast_namespace_id;
      87             :     int         sec_context;
      88             :     bool        is_superuser;
      89             :     PGPROC     *parallel_master_pgproc;
      90             :     pid_t       parallel_master_pid;
      91             :     BackendId   parallel_master_backend_id;
      92             :     TimestampTz xact_ts;
      93             :     TimestampTz stmt_ts;
      94             :     SerializableXactHandle serializable_xact_handle;
      95             : 
      96             :     /* Mutex protects remaining fields. */
      97             :     slock_t     mutex;
      98             : 
      99             :     /* Maximum XactLastRecEnd of any worker. */
     100             :     XLogRecPtr  last_xlog_end;
     101             : } FixedParallelState;
     102             : 
     103             : /*
     104             :  * Our parallel worker number.  We initialize this to -1, meaning that we are
     105             :  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
     106             :  * and < the number of workers before any user code is invoked; each parallel
     107             :  * worker will get a different parallel worker number.
     108             :  */
     109             : int         ParallelWorkerNumber = -1;
     110             : 
     111             : /* Is there a parallel message pending which we need to receive? */
     112             : volatile bool ParallelMessagePending = false;
     113             : 
     114             : /* Are we initializing a parallel worker? */
     115             : bool        InitializingParallelWorker = false;
     116             : 
     117             : /* Pointer to our fixed parallel state. */
     118             : static FixedParallelState *MyFixedParallelState;
     119             : 
     120             : /* List of active parallel contexts. */
     121             : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
     122             : 
     123             : /* Backend-local copy of data from FixedParallelState. */
     124             : static pid_t ParallelMasterPid;
     125             : 
     126             : /*
     127             :  * List of internal parallel worker entry points.  We need this for
     128             :  * reasons explained in LookupParallelWorkerFunction(), below.
     129             :  */
     130             : static const struct
     131             : {
     132             :     const char *fn_name;
     133             :     parallel_worker_main_type fn_addr;
     134             : }           InternalParallelWorkers[] =
     135             : 
     136             : {
     137             :     {
     138             :         "ParallelQueryMain", ParallelQueryMain
     139             :     },
     140             :     {
     141             :         "_bt_parallel_build_main", _bt_parallel_build_main
     142             :     }
     143             : };
     144             : 
     145             : /* Private functions. */
     146             : static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
     147             : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
     148             : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
     149             : static void ParallelWorkerShutdown(int code, Datum arg);
     150             : 
     151             : 
     152             : /*
     153             :  * Establish a new parallel context.  This should be done after entering
     154             :  * parallel mode, and (unless there is an error) the context should be
     155             :  * destroyed before exiting the current subtransaction.
     156             :  */
     157             : ParallelContext *
     158         446 : CreateParallelContext(const char *library_name, const char *function_name,
     159             :                       int nworkers)
     160             : {
     161             :     MemoryContext oldcontext;
     162             :     ParallelContext *pcxt;
     163             : 
     164             :     /* It is unsafe to create a parallel context if not in parallel mode. */
     165             :     Assert(IsInParallelMode());
     166             : 
     167             :     /* Number of workers should be non-negative. */
     168             :     Assert(nworkers >= 0);
     169             : 
     170             :     /* We might be running in a short-lived memory context. */
     171         446 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     172             : 
     173             :     /* Initialize a new ParallelContext. */
     174         446 :     pcxt = palloc0(sizeof(ParallelContext));
     175         446 :     pcxt->subid = GetCurrentSubTransactionId();
     176         446 :     pcxt->nworkers = nworkers;
     177         446 :     pcxt->library_name = pstrdup(library_name);
     178         446 :     pcxt->function_name = pstrdup(function_name);
     179         446 :     pcxt->error_context_stack = error_context_stack;
     180         446 :     shm_toc_initialize_estimator(&pcxt->estimator);
     181         446 :     dlist_push_head(&pcxt_list, &pcxt->node);
     182             : 
     183             :     /* Restore previous memory context. */
     184         446 :     MemoryContextSwitchTo(oldcontext);
     185             : 
     186         446 :     return pcxt;
     187             : }
     188             : 
     189             : /*
     190             :  * Establish the dynamic shared memory segment for a parallel context and
     191             :  * copy state and other bookkeeping information that will be needed by
     192             :  * parallel workers into it.
     193             :  */
     194             : void
     195         446 : InitializeParallelDSM(ParallelContext *pcxt)
     196             : {
     197             :     MemoryContext oldcontext;
     198         446 :     Size        library_len = 0;
     199         446 :     Size        guc_len = 0;
     200         446 :     Size        combocidlen = 0;
     201         446 :     Size        tsnaplen = 0;
     202         446 :     Size        asnaplen = 0;
     203         446 :     Size        tstatelen = 0;
     204         446 :     Size        reindexlen = 0;
     205         446 :     Size        relmapperlen = 0;
     206         446 :     Size        enumblacklistlen = 0;
     207         446 :     Size        segsize = 0;
     208             :     int         i;
     209             :     FixedParallelState *fps;
     210         446 :     dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
     211         446 :     Snapshot    transaction_snapshot = GetTransactionSnapshot();
     212         446 :     Snapshot    active_snapshot = GetActiveSnapshot();
     213             : 
     214             :     /* We might be running in a very short-lived memory context. */
     215         446 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     216             : 
     217             :     /* Allow space to store the fixed-size parallel state. */
     218         446 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
     219         446 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
     220             : 
     221             :     /*
     222             :      * Normally, the user will have requested at least one worker process, but
     223             :      * if by chance they have not, we can skip a bunch of things here.
     224             :      */
     225         446 :     if (pcxt->nworkers > 0)
     226             :     {
     227             :         /* Get (or create) the per-session DSM segment's handle. */
     228         446 :         session_dsm_handle = GetSessionDsmHandle();
     229             : 
     230             :         /*
     231             :          * If we weren't able to create a per-session DSM segment, then we can
     232             :          * continue but we can't safely launch any workers because their
     233             :          * record typmods would be incompatible so they couldn't exchange
     234             :          * tuples.
     235             :          */
     236         446 :         if (session_dsm_handle == DSM_HANDLE_INVALID)
     237           0 :             pcxt->nworkers = 0;
     238             :     }
     239             : 
     240         446 :     if (pcxt->nworkers > 0)
     241             :     {
     242             :         /* Estimate space for various kinds of state sharing. */
     243         446 :         library_len = EstimateLibraryStateSpace();
     244         446 :         shm_toc_estimate_chunk(&pcxt->estimator, library_len);
     245         446 :         guc_len = EstimateGUCStateSpace();
     246         446 :         shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
     247         446 :         combocidlen = EstimateComboCIDStateSpace();
     248         446 :         shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
     249         446 :         tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
     250         446 :         shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
     251         446 :         asnaplen = EstimateSnapshotSpace(active_snapshot);
     252         446 :         shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
     253         446 :         tstatelen = EstimateTransactionStateSpace();
     254         446 :         shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
     255         446 :         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
     256         446 :         reindexlen = EstimateReindexStateSpace();
     257         446 :         shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
     258         446 :         relmapperlen = EstimateRelationMapSpace();
     259         446 :         shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
     260         446 :         enumblacklistlen = EstimateEnumBlacklistSpace();
     261         446 :         shm_toc_estimate_chunk(&pcxt->estimator, enumblacklistlen);
     262             :         /* If you add more chunks here, you probably need to add keys. */
     263         446 :         shm_toc_estimate_keys(&pcxt->estimator, 10);
     264             : 
     265             :         /* Estimate space need for error queues. */
     266             :         StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
     267             :                          PARALLEL_ERROR_QUEUE_SIZE,
     268             :                          "parallel error queue size not buffer-aligned");
     269         446 :         shm_toc_estimate_chunk(&pcxt->estimator,
     270             :                                mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     271             :                                         pcxt->nworkers));
     272         446 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     273             : 
     274             :         /* Estimate how much we'll need for the entrypoint info. */
     275         446 :         shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
     276             :                                strlen(pcxt->function_name) + 2);
     277         446 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     278             :     }
     279             : 
     280             :     /*
     281             :      * Create DSM and initialize with new table of contents.  But if the user
     282             :      * didn't request any workers, then don't bother creating a dynamic shared
     283             :      * memory segment; instead, just use backend-private memory.
     284             :      *
     285             :      * Also, if we can't create a dynamic shared memory segment because the
     286             :      * maximum number of segments have already been created, then fall back to
     287             :      * backend-private memory, and plan not to use any workers.  We hope this
     288             :      * won't happen very often, but it's better to abandon the use of
     289             :      * parallelism than to fail outright.
     290             :      */
     291         446 :     segsize = shm_toc_estimate(&pcxt->estimator);
     292         446 :     if (pcxt->nworkers > 0)
     293         446 :         pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
     294         446 :     if (pcxt->seg != NULL)
     295         446 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
     296             :                                    dsm_segment_address(pcxt->seg),
     297             :                                    segsize);
     298             :     else
     299             :     {
     300           0 :         pcxt->nworkers = 0;
     301           0 :         pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
     302           0 :         pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
     303             :                                    segsize);
     304             :     }
     305             : 
     306             :     /* Initialize fixed-size state in shared memory. */
     307         446 :     fps = (FixedParallelState *)
     308         446 :         shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
     309         446 :     fps->database_id = MyDatabaseId;
     310         446 :     fps->authenticated_user_id = GetAuthenticatedUserId();
     311         446 :     fps->outer_user_id = GetCurrentRoleId();
     312         446 :     fps->is_superuser = session_auth_is_superuser;
     313         446 :     GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
     314         446 :     GetTempNamespaceState(&fps->temp_namespace_id,
     315             :                           &fps->temp_toast_namespace_id);
     316         446 :     fps->parallel_master_pgproc = MyProc;
     317         446 :     fps->parallel_master_pid = MyProcPid;
     318         446 :     fps->parallel_master_backend_id = MyBackendId;
     319         446 :     fps->xact_ts = GetCurrentTransactionStartTimestamp();
     320         446 :     fps->stmt_ts = GetCurrentStatementStartTimestamp();
     321         446 :     fps->serializable_xact_handle = ShareSerializableXact();
     322         446 :     SpinLockInit(&fps->mutex);
     323         446 :     fps->last_xlog_end = 0;
     324         446 :     shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
     325             : 
     326             :     /* We can skip the rest of this if we're not budgeting for any workers. */
     327         446 :     if (pcxt->nworkers > 0)
     328             :     {
     329             :         char       *libraryspace;
     330             :         char       *gucspace;
     331             :         char       *combocidspace;
     332             :         char       *tsnapspace;
     333             :         char       *asnapspace;
     334             :         char       *tstatespace;
     335             :         char       *reindexspace;
     336             :         char       *relmapperspace;
     337             :         char       *error_queue_space;
     338             :         char       *session_dsm_handle_space;
     339             :         char       *entrypointstate;
     340             :         char       *enumblacklistspace;
     341             :         Size        lnamelen;
     342             : 
     343             :         /* Serialize shared libraries we have loaded. */
     344         446 :         libraryspace = shm_toc_allocate(pcxt->toc, library_len);
     345         446 :         SerializeLibraryState(library_len, libraryspace);
     346         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
     347             : 
     348             :         /* Serialize GUC settings. */
     349         446 :         gucspace = shm_toc_allocate(pcxt->toc, guc_len);
     350         446 :         SerializeGUCState(guc_len, gucspace);
     351         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
     352             : 
     353             :         /* Serialize combo CID state. */
     354         446 :         combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
     355         446 :         SerializeComboCIDState(combocidlen, combocidspace);
     356         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
     357             : 
     358             :         /* Serialize transaction snapshot and active snapshot. */
     359         446 :         tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
     360         446 :         SerializeSnapshot(transaction_snapshot, tsnapspace);
     361         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
     362             :                        tsnapspace);
     363         446 :         asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
     364         446 :         SerializeSnapshot(active_snapshot, asnapspace);
     365         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
     366             : 
     367             :         /* Provide the handle for per-session segment. */
     368         446 :         session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
     369             :                                                     sizeof(dsm_handle));
     370         446 :         *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
     371         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
     372             :                        session_dsm_handle_space);
     373             : 
     374             :         /* Serialize transaction state. */
     375         446 :         tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
     376         446 :         SerializeTransactionState(tstatelen, tstatespace);
     377         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
     378             : 
     379             :         /* Serialize reindex state. */
     380         446 :         reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
     381         446 :         SerializeReindexState(reindexlen, reindexspace);
     382         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
     383             : 
     384             :         /* Serialize relmapper state. */
     385         446 :         relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
     386         446 :         SerializeRelationMap(relmapperlen, relmapperspace);
     387         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
     388             :                        relmapperspace);
     389             : 
     390             :         /* Serialize enum blacklist state. */
     391         446 :         enumblacklistspace = shm_toc_allocate(pcxt->toc, enumblacklistlen);
     392         446 :         SerializeEnumBlacklist(enumblacklistspace, enumblacklistlen);
     393         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENUMBLACKLIST,
     394             :                        enumblacklistspace);
     395             : 
     396             :         /* Allocate space for worker information. */
     397         446 :         pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
     398             : 
     399             :         /*
     400             :          * Establish error queues in dynamic shared memory.
     401             :          *
     402             :          * These queues should be used only for transmitting ErrorResponse,
     403             :          * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
     404             :          * should be transmitted via separate (possibly larger?) queues.
     405             :          */
     406         446 :         error_queue_space =
     407         446 :             shm_toc_allocate(pcxt->toc,
     408             :                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
     409         446 :                                       pcxt->nworkers));
     410        1472 :         for (i = 0; i < pcxt->nworkers; ++i)
     411             :         {
     412             :             char       *start;
     413             :             shm_mq     *mq;
     414             : 
     415        1026 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     416        1026 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     417        1026 :             shm_mq_set_receiver(mq, MyProc);
     418        1026 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     419             :         }
     420         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
     421             : 
     422             :         /*
     423             :          * Serialize entrypoint information.  It's unsafe to pass function
     424             :          * pointers across processes, as the function pointer may be different
     425             :          * in each process in EXEC_BACKEND builds, so we always pass library
     426             :          * and function name.  (We use library name "postgres" for functions
     427             :          * in the core backend.)
     428             :          */
     429         446 :         lnamelen = strlen(pcxt->library_name);
     430         446 :         entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
     431         446 :                                            strlen(pcxt->function_name) + 2);
     432         446 :         strcpy(entrypointstate, pcxt->library_name);
     433         446 :         strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
     434         446 :         shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
     435             :     }
     436             : 
     437             :     /* Restore previous memory context. */
     438         446 :     MemoryContextSwitchTo(oldcontext);
     439         446 : }
     440             : 
     441             : /*
     442             :  * Reinitialize the dynamic shared memory segment for a parallel context such
     443             :  * that we could launch workers for it again.
     444             :  */
     445             : void
     446         168 : ReinitializeParallelDSM(ParallelContext *pcxt)
     447             : {
     448             :     FixedParallelState *fps;
     449             : 
     450             :     /* Wait for any old workers to exit. */
     451         168 :     if (pcxt->nworkers_launched > 0)
     452             :     {
     453         168 :         WaitForParallelWorkersToFinish(pcxt);
     454         168 :         WaitForParallelWorkersToExit(pcxt);
     455         168 :         pcxt->nworkers_launched = 0;
     456         168 :         if (pcxt->known_attached_workers)
     457             :         {
     458         168 :             pfree(pcxt->known_attached_workers);
     459         168 :             pcxt->known_attached_workers = NULL;
     460         168 :             pcxt->nknown_attached_workers = 0;
     461             :         }
     462             :     }
     463             : 
     464             :     /* Reset a few bits of fixed parallel state to a clean state. */
     465         168 :     fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     466         168 :     fps->last_xlog_end = 0;
     467             : 
     468             :     /* Recreate error queues (if they exist). */
     469         168 :     if (pcxt->nworkers > 0)
     470             :     {
     471             :         char       *error_queue_space;
     472             :         int         i;
     473             : 
     474         168 :         error_queue_space =
     475         168 :             shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
     476         712 :         for (i = 0; i < pcxt->nworkers; ++i)
     477             :         {
     478             :             char       *start;
     479             :             shm_mq     *mq;
     480             : 
     481         544 :             start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
     482         544 :             mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
     483         544 :             shm_mq_set_receiver(mq, MyProc);
     484         544 :             pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
     485             :         }
     486             :     }
     487         168 : }
     488             : 
     489             : /*
     490             :  * Launch parallel workers.
     491             :  */
     492             : void
     493         614 : LaunchParallelWorkers(ParallelContext *pcxt)
     494             : {
     495             :     MemoryContext oldcontext;
     496             :     BackgroundWorker worker;
     497             :     int         i;
     498         614 :     bool        any_registrations_failed = false;
     499             : 
     500             :     /* Skip this if we have no workers. */
     501         614 :     if (pcxt->nworkers == 0)
     502           0 :         return;
     503             : 
     504             :     /* We need to be a lock group leader. */
     505         614 :     BecomeLockGroupLeader();
     506             : 
     507             :     /* If we do have workers, we'd better have a DSM segment. */
     508             :     Assert(pcxt->seg != NULL);
     509             : 
     510             :     /* We might be running in a short-lived memory context. */
     511         614 :     oldcontext = MemoryContextSwitchTo(TopTransactionContext);
     512             : 
     513             :     /* Configure a worker. */
     514         614 :     memset(&worker, 0, sizeof(worker));
     515         614 :     snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
     516             :              MyProcPid);
     517         614 :     snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
     518         614 :     worker.bgw_flags =
     519             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
     520             :         | BGWORKER_CLASS_PARALLEL;
     521         614 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     522         614 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     523         614 :     sprintf(worker.bgw_library_name, "postgres");
     524         614 :     sprintf(worker.bgw_function_name, "ParallelWorkerMain");
     525         614 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
     526         614 :     worker.bgw_notify_pid = MyProcPid;
     527             : 
     528             :     /*
     529             :      * Start workers.
     530             :      *
     531             :      * The caller must be able to tolerate ending up with fewer workers than
     532             :      * expected, so there is no need to throw an error here if registration
     533             :      * fails.  It wouldn't help much anyway, because registering the worker in
     534             :      * no way guarantees that it will start up and initialize successfully.
     535             :      */
     536        2184 :     for (i = 0; i < pcxt->nworkers; ++i)
     537             :     {
     538        1570 :         memcpy(worker.bgw_extra, &i, sizeof(int));
     539        3104 :         if (!any_registrations_failed &&
     540        1534 :             RegisterDynamicBackgroundWorker(&worker,
     541        1534 :                                             &pcxt->worker[i].bgwhandle))
     542             :         {
     543        1518 :             shm_mq_set_handle(pcxt->worker[i].error_mqh,
     544        1518 :                               pcxt->worker[i].bgwhandle);
     545        1518 :             pcxt->nworkers_launched++;
     546             :         }
     547             :         else
     548             :         {
     549             :             /*
     550             :              * If we weren't able to register the worker, then we've bumped up
     551             :              * against the max_worker_processes limit, and future
     552             :              * registrations will probably fail too, so arrange to skip them.
     553             :              * But we still have to execute this code for the remaining slots
     554             :              * to make sure that we forget about the error queues we budgeted
     555             :              * for those workers.  Otherwise, we'll wait for them to start,
     556             :              * but they never will.
     557             :              */
     558          52 :             any_registrations_failed = true;
     559          52 :             pcxt->worker[i].bgwhandle = NULL;
     560          52 :             shm_mq_detach(pcxt->worker[i].error_mqh);
     561          52 :             pcxt->worker[i].error_mqh = NULL;
     562             :         }
     563             :     }
     564             : 
     565             :     /*
     566             :      * Now that nworkers_launched has taken its final value, we can initialize
     567             :      * known_attached_workers.
     568             :      */
     569         614 :     if (pcxt->nworkers_launched > 0)
     570             :     {
     571         602 :         pcxt->known_attached_workers =
     572         602 :             palloc0(sizeof(bool) * pcxt->nworkers_launched);
     573         602 :         pcxt->nknown_attached_workers = 0;
     574             :     }
     575             : 
     576             :     /* Restore previous memory context. */
     577         614 :     MemoryContextSwitchTo(oldcontext);
     578             : }
     579             : 
     580             : /*
     581             :  * Wait for all workers to attach to their error queues, and throw an error if
     582             :  * any worker fails to do this.
     583             :  *
     584             :  * Callers can assume that if this function returns successfully, then the
     585             :  * number of workers given by pcxt->nworkers_launched have initialized and
     586             :  * attached to their error queues.  Whether or not these workers are guaranteed
     587             :  * to still be running depends on what code the caller asked them to run;
     588             :  * this function does not guarantee that they have not exited.  However, it
     589             :  * does guarantee that any workers which exited must have done so cleanly and
     590             :  * after successfully performing the work with which they were tasked.
     591             :  *
     592             :  * If this function is not called, then some of the workers that were launched
     593             :  * may not have been started due to a fork() failure, or may have exited during
     594             :  * early startup prior to attaching to the error queue, so nworkers_launched
     595             :  * cannot be viewed as completely reliable.  It will never be less than the
     596             :  * number of workers which actually started, but it might be more.  Any workers
     597             :  * that failed to start will still be discovered by
     598             :  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
     599             :  * provided that function is eventually reached.
     600             :  *
     601             :  * In general, the leader process should do as much work as possible before
     602             :  * calling this function.  fork() failures and other early-startup failures
     603             :  * are very uncommon, and having the leader sit idle when it could be doing
     604             :  * useful work is undesirable.  However, if the leader needs to wait for
     605             :  * all of its workers or for a specific worker, it may want to call this
     606             :  * function before doing so.  If not, it must make some other provision for
     607             :  * the failure-to-start case, lest it wait forever.  On the other hand, a
     608             :  * leader which never waits for a worker that might not be started yet, or
     609             :  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
     610             :  * call this function at all.
     611             :  */
     612             : void
     613          84 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
     614             : {
     615             :     int         i;
     616             : 
     617             :     /* Skip this if we have no launched workers. */
     618          84 :     if (pcxt->nworkers_launched == 0)
     619           0 :         return;
     620             : 
     621             :     for (;;)
     622             :     {
     623             :         /*
     624             :          * This will process any parallel messages that are pending and it may
     625             :          * also throw an error propagated from a worker.
     626             :          */
     627     8948304 :         CHECK_FOR_INTERRUPTS();
     628             : 
     629     8948388 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     630             :         {
     631             :             BgwHandleStatus status;
     632             :             shm_mq     *mq;
     633             :             int         rc;
     634             :             pid_t       pid;
     635             : 
     636     4474194 :             if (pcxt->known_attached_workers[i])
     637          32 :                 continue;
     638             : 
     639             :             /*
     640             :              * If error_mqh is NULL, then the worker has already exited
     641             :              * cleanly.
     642             :              */
     643     4474178 :             if (pcxt->worker[i].error_mqh == NULL)
     644             :             {
     645           0 :                 pcxt->known_attached_workers[i] = true;
     646           0 :                 ++pcxt->nknown_attached_workers;
     647           0 :                 continue;
     648             :             }
     649             : 
     650     4474178 :             status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
     651     4474178 :             if (status == BGWH_STARTED)
     652             :             {
     653             :                 /* Has the worker attached to the error queue? */
     654     4474104 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     655     4474104 :                 if (shm_mq_get_sender(mq) != NULL)
     656             :                 {
     657             :                     /* Yes, so it is known to be attached. */
     658          68 :                     pcxt->known_attached_workers[i] = true;
     659          68 :                     ++pcxt->nknown_attached_workers;
     660             :                 }
     661             :             }
     662          74 :             else if (status == BGWH_STOPPED)
     663             :             {
     664             :                 /*
     665             :                  * If the worker stopped without attaching to the error queue,
     666             :                  * throw an error.
     667             :                  */
     668           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     669           0 :                 if (shm_mq_get_sender(mq) == NULL)
     670           0 :                     ereport(ERROR,
     671             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     672             :                              errmsg("parallel worker failed to initialize"),
     673             :                              errhint("More details may be available in the server log.")));
     674             : 
     675           0 :                 pcxt->known_attached_workers[i] = true;
     676           0 :                 ++pcxt->nknown_attached_workers;
     677             :             }
     678             :             else
     679             :             {
     680             :                 /*
     681             :                  * Worker not yet started, so we must wait.  The postmaster
     682             :                  * will notify us if the worker's state changes.  Our latch
     683             :                  * might also get set for some other reason, but if so we'll
     684             :                  * just end up waiting for the same worker again.
     685             :                  */
     686          74 :                 rc = WaitLatch(MyLatch,
     687             :                                WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     688             :                                -1, WAIT_EVENT_BGWORKER_STARTUP);
     689             : 
     690          74 :                 if (rc & WL_LATCH_SET)
     691          74 :                     ResetLatch(MyLatch);
     692             :             }
     693             :         }
     694             : 
     695             :         /* If all workers are known to have started, we're done. */
     696     4474194 :         if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
     697             :         {
     698             :             Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
     699          84 :             break;
     700             :         }
     701             :     }
     702             : }
     703             : 
     704             : /*
     705             :  * Wait for all workers to finish computing.
     706             :  *
     707             :  * Even if the parallel operation seems to have completed successfully, it's
     708             :  * important to call this function afterwards.  We must not miss any errors
     709             :  * the workers may have thrown during the parallel operation, or any that they
     710             :  * may yet throw while shutting down.
     711             :  *
     712             :  * Also, we want to update our notion of XactLastRecEnd based on worker
     713             :  * feedback.
     714             :  */
     715             : void
     716        1430 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
     717             : {
     718             :     for (;;)
     719         652 :     {
     720        1430 :         bool        anyone_alive = false;
     721        1430 :         int         nfinished = 0;
     722             :         int         i;
     723             : 
     724             :         /*
     725             :          * This will process any parallel messages that are pending, which may
     726             :          * change the outcome of the loop that follows.  It may also throw an
     727             :          * error propagated from a worker.
     728             :          */
     729        1430 :         CHECK_FOR_INTERRUPTS();
     730             : 
     731        3644 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     732             :         {
     733             :             /*
     734             :              * If error_mqh is NULL, then the worker has already exited
     735             :              * cleanly.  If we have received a message through error_mqh from
     736             :              * the worker, we know it started up cleanly, and therefore we're
     737             :              * certain to be notified when it exits.
     738             :              */
     739        2866 :             if (pcxt->worker[i].error_mqh == NULL)
     740        2214 :                 ++nfinished;
     741         652 :             else if (pcxt->known_attached_workers[i])
     742             :             {
     743         652 :                 anyone_alive = true;
     744         652 :                 break;
     745             :             }
     746             :         }
     747             : 
     748        1430 :         if (!anyone_alive)
     749             :         {
     750             :             /* If all workers are known to have finished, we're done. */
     751         778 :             if (nfinished >= pcxt->nworkers_launched)
     752             :             {
     753             :                 Assert(nfinished == pcxt->nworkers_launched);
     754         778 :                 break;
     755             :             }
     756             : 
     757             :             /*
     758             :              * We didn't detect any living workers, but not all workers are
     759             :              * known to have exited cleanly.  Either not all workers have
     760             :              * launched yet, or maybe some of them failed to start or
     761             :              * terminated abnormally.
     762             :              */
     763           0 :             for (i = 0; i < pcxt->nworkers_launched; ++i)
     764             :             {
     765             :                 pid_t       pid;
     766             :                 shm_mq     *mq;
     767             : 
     768             :                 /*
     769             :                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
     770             :                  * should just keep waiting.  If it is BGWH_STOPPED, then
     771             :                  * further investigation is needed.
     772             :                  */
     773           0 :                 if (pcxt->worker[i].error_mqh == NULL ||
     774           0 :                     pcxt->worker[i].bgwhandle == NULL ||
     775           0 :                     GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
     776             :                                            &pid) != BGWH_STOPPED)
     777           0 :                     continue;
     778             : 
     779             :                 /*
     780             :                  * Check whether the worker ended up stopped without ever
     781             :                  * attaching to the error queue.  If so, the postmaster was
     782             :                  * unable to fork the worker or it exited without initializing
     783             :                  * properly.  We must throw an error, since the caller may
     784             :                  * have been expecting the worker to do some work before
     785             :                  * exiting.
     786             :                  */
     787           0 :                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
     788           0 :                 if (shm_mq_get_sender(mq) == NULL)
     789           0 :                     ereport(ERROR,
     790             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     791             :                              errmsg("parallel worker failed to initialize"),
     792             :                              errhint("More details may be available in the server log.")));
     793             : 
     794             :                 /*
     795             :                  * The worker is stopped, but is attached to the error queue.
     796             :                  * Unless there's a bug somewhere, this will only happen when
     797             :                  * the worker writes messages and terminates after the
     798             :                  * CHECK_FOR_INTERRUPTS() near the top of this function and
     799             :                  * before the call to GetBackgroundWorkerPid().  In that case,
     800             :                  * or latch should have been set as well and the right things
     801             :                  * will happen on the next pass through the loop.
     802             :                  */
     803             :             }
     804             :         }
     805             : 
     806         652 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     807             :                          WAIT_EVENT_PARALLEL_FINISH);
     808         652 :         ResetLatch(MyLatch);
     809             :     }
     810             : 
     811         778 :     if (pcxt->toc != NULL)
     812             :     {
     813             :         FixedParallelState *fps;
     814             : 
     815         778 :         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
     816         778 :         if (fps->last_xlog_end > XactLastRecEnd)
     817          12 :             XactLastRecEnd = fps->last_xlog_end;
     818             :     }
     819         778 : }
     820             : 
     821             : /*
     822             :  * Wait for all workers to exit.
     823             :  *
     824             :  * This function ensures that workers have been completely shutdown.  The
     825             :  * difference between WaitForParallelWorkersToFinish and this function is
     826             :  * that former just ensures that last message sent by worker backend is
     827             :  * received by master backend whereas this ensures the complete shutdown.
     828             :  */
     829             : static void
     830         614 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
     831             : {
     832             :     int         i;
     833             : 
     834             :     /* Wait until the workers actually die. */
     835        2132 :     for (i = 0; i < pcxt->nworkers_launched; ++i)
     836             :     {
     837             :         BgwHandleStatus status;
     838             : 
     839        1518 :         if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
     840           0 :             continue;
     841             : 
     842        1518 :         status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
     843             : 
     844             :         /*
     845             :          * If the postmaster kicked the bucket, we have no chance of cleaning
     846             :          * up safely -- we won't be able to tell when our workers are actually
     847             :          * dead.  This doesn't necessitate a PANIC since they will all abort
     848             :          * eventually, but we can't safely continue this session.
     849             :          */
     850        1518 :         if (status == BGWH_POSTMASTER_DIED)
     851           0 :             ereport(FATAL,
     852             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     853             :                      errmsg("postmaster exited during a parallel transaction")));
     854             : 
     855             :         /* Release memory. */
     856        1518 :         pfree(pcxt->worker[i].bgwhandle);
     857        1518 :         pcxt->worker[i].bgwhandle = NULL;
     858             :     }
     859         614 : }
     860             : 
     861             : /*
     862             :  * Destroy a parallel context.
     863             :  *
     864             :  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
     865             :  * first, before calling this function.  When this function is invoked, any
     866             :  * remaining workers are forcibly killed; the dynamic shared memory segment
     867             :  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
     868             :  */
     869             : void
     870         446 : DestroyParallelContext(ParallelContext *pcxt)
     871             : {
     872             :     int         i;
     873             : 
     874             :     /*
     875             :      * Be careful about order of operations here!  We remove the parallel
     876             :      * context from the list before we do anything else; otherwise, if an
     877             :      * error occurs during a subsequent step, we might try to nuke it again
     878             :      * from AtEOXact_Parallel or AtEOSubXact_Parallel.
     879             :      */
     880         446 :     dlist_delete(&pcxt->node);
     881             : 
     882             :     /* Kill each worker in turn, and forget their error queues. */
     883         446 :     if (pcxt->worker != NULL)
     884             :     {
     885        1420 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
     886             :         {
     887         974 :             if (pcxt->worker[i].error_mqh != NULL)
     888             :             {
     889           4 :                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
     890             : 
     891           4 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
     892           4 :                 pcxt->worker[i].error_mqh = NULL;
     893             :             }
     894             :         }
     895             :     }
     896             : 
     897             :     /*
     898             :      * If we have allocated a shared memory segment, detach it.  This will
     899             :      * implicitly detach the error queues, and any other shared memory queues,
     900             :      * stored there.
     901             :      */
     902         446 :     if (pcxt->seg != NULL)
     903             :     {
     904         446 :         dsm_detach(pcxt->seg);
     905         446 :         pcxt->seg = NULL;
     906             :     }
     907             : 
     908             :     /*
     909             :      * If this parallel context is actually in backend-private memory rather
     910             :      * than shared memory, free that memory instead.
     911             :      */
     912         446 :     if (pcxt->private_memory != NULL)
     913             :     {
     914           0 :         pfree(pcxt->private_memory);
     915           0 :         pcxt->private_memory = NULL;
     916             :     }
     917             : 
     918             :     /*
     919             :      * We can't finish transaction commit or abort until all of the workers
     920             :      * have exited.  This means, in particular, that we can't respond to
     921             :      * interrupts at this stage.
     922             :      */
     923         446 :     HOLD_INTERRUPTS();
     924         446 :     WaitForParallelWorkersToExit(pcxt);
     925         446 :     RESUME_INTERRUPTS();
     926             : 
     927             :     /* Free the worker array itself. */
     928         446 :     if (pcxt->worker != NULL)
     929             :     {
     930         446 :         pfree(pcxt->worker);
     931         446 :         pcxt->worker = NULL;
     932             :     }
     933             : 
     934             :     /* Free memory. */
     935         446 :     pfree(pcxt->library_name);
     936         446 :     pfree(pcxt->function_name);
     937         446 :     pfree(pcxt);
     938         446 : }
     939             : 
     940             : /*
     941             :  * Are there any parallel contexts currently active?
     942             :  */
     943             : bool
     944           0 : ParallelContextActive(void)
     945             : {
     946           0 :     return !dlist_is_empty(&pcxt_list);
     947             : }
     948             : 
     949             : /*
     950             :  * Handle receipt of an interrupt indicating a parallel worker message.
     951             :  *
     952             :  * Note: this is called within a signal handler!  All we can do is set
     953             :  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     954             :  * HandleParallelMessages().
     955             :  */
     956             : void
     957        4366 : HandleParallelMessageInterrupt(void)
     958             : {
     959        4366 :     InterruptPending = true;
     960        4366 :     ParallelMessagePending = true;
     961        4366 :     SetLatch(MyLatch);
     962        4366 : }
     963             : 
     964             : /*
     965             :  * Handle any queued protocol messages received from parallel workers.
     966             :  */
     967             : void
     968        4354 : HandleParallelMessages(void)
     969             : {
     970             :     dlist_iter  iter;
     971             :     MemoryContext oldcontext;
     972             : 
     973             :     static MemoryContext hpm_context = NULL;
     974             : 
     975             :     /*
     976             :      * This is invoked from ProcessInterrupts(), and since some of the
     977             :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
     978             :      * for recursive calls if more signals are received while this runs.  It's
     979             :      * unclear that recursive entry would be safe, and it doesn't seem useful
     980             :      * even if it is safe, so let's block interrupts until done.
     981             :      */
     982        4354 :     HOLD_INTERRUPTS();
     983             : 
     984             :     /*
     985             :      * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
     986             :      * don't want to risk leaking data into long-lived contexts, so let's do
     987             :      * our work here in a private context that we can reset on each use.
     988             :      */
     989        4354 :     if (hpm_context == NULL)    /* first time through? */
     990          44 :         hpm_context = AllocSetContextCreate(TopMemoryContext,
     991             :                                             "HandleParallelMessages",
     992             :                                             ALLOCSET_DEFAULT_SIZES);
     993             :     else
     994        4310 :         MemoryContextReset(hpm_context);
     995             : 
     996        4354 :     oldcontext = MemoryContextSwitchTo(hpm_context);
     997             : 
     998             :     /* OK to process messages.  Reset the flag saying there are more to do. */
     999        4354 :     ParallelMessagePending = false;
    1000             : 
    1001        8490 :     dlist_foreach(iter, &pcxt_list)
    1002             :     {
    1003             :         ParallelContext *pcxt;
    1004             :         int         i;
    1005             : 
    1006        4140 :         pcxt = dlist_container(ParallelContext, node, iter.cur);
    1007        4140 :         if (pcxt->worker == NULL)
    1008           0 :             continue;
    1009             : 
    1010       17204 :         for (i = 0; i < pcxt->nworkers_launched; ++i)
    1011             :         {
    1012             :             /*
    1013             :              * Read as many messages as we can from each worker, but stop when
    1014             :              * either (1) the worker's error queue goes away, which can happen
    1015             :              * if we receive a Terminate message from the worker; or (2) no
    1016             :              * more messages can be read from the worker without blocking.
    1017             :              */
    1018       29168 :             while (pcxt->worker[i].error_mqh != NULL)
    1019             :             {
    1020             :                 shm_mq_result res;
    1021             :                 Size        nbytes;
    1022             :                 void       *data;
    1023             : 
    1024        9824 :                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
    1025             :                                      &data, true);
    1026        9824 :                 if (res == SHM_MQ_WOULD_BLOCK)
    1027        6788 :                     break;
    1028        3036 :                 else if (res == SHM_MQ_SUCCESS)
    1029             :                 {
    1030             :                     StringInfoData msg;
    1031             : 
    1032        3036 :                     initStringInfo(&msg);
    1033        3036 :                     appendBinaryStringInfo(&msg, data, nbytes);
    1034        3036 :                     HandleParallelMessage(pcxt, i, &msg);
    1035        3032 :                     pfree(msg.data);
    1036             :                 }
    1037             :                 else
    1038           0 :                     ereport(ERROR,
    1039             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1040             :                              errmsg("lost connection to parallel worker")));
    1041             :             }
    1042             :         }
    1043             :     }
    1044             : 
    1045        4350 :     MemoryContextSwitchTo(oldcontext);
    1046             : 
    1047             :     /* Might as well clear the context on our way out */
    1048        4350 :     MemoryContextReset(hpm_context);
    1049             : 
    1050        4350 :     RESUME_INTERRUPTS();
    1051        4350 : }
    1052             : 
    1053             : /*
    1054             :  * Handle a single protocol message received from a single parallel worker.
    1055             :  */
    1056             : static void
    1057        3036 : HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
    1058             : {
    1059             :     char        msgtype;
    1060             : 
    1061        6072 :     if (pcxt->known_attached_workers != NULL &&
    1062        3036 :         !pcxt->known_attached_workers[i])
    1063             :     {
    1064        1450 :         pcxt->known_attached_workers[i] = true;
    1065        1450 :         pcxt->nknown_attached_workers++;
    1066             :     }
    1067             : 
    1068        3036 :     msgtype = pq_getmsgbyte(msg);
    1069             : 
    1070        3036 :     switch (msgtype)
    1071             :     {
    1072             :         case 'K':               /* BackendKeyData */
    1073             :             {
    1074        1518 :                 int32       pid = pq_getmsgint(msg, 4);
    1075             : 
    1076        1518 :                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
    1077        1518 :                 (void) pq_getmsgend(msg);
    1078        1518 :                 pcxt->worker[i].pid = pid;
    1079        1518 :                 break;
    1080             :             }
    1081             : 
    1082             :         case 'E':               /* ErrorResponse */
    1083             :         case 'N':               /* NoticeResponse */
    1084             :             {
    1085             :                 ErrorData   edata;
    1086             :                 ErrorContextCallback *save_error_context_stack;
    1087             : 
    1088             :                 /* Parse ErrorResponse or NoticeResponse. */
    1089           4 :                 pq_parse_errornotice(msg, &edata);
    1090             : 
    1091             :                 /* Death of a worker isn't enough justification for suicide. */
    1092           4 :                 edata.elevel = Min(edata.elevel, ERROR);
    1093             : 
    1094             :                 /*
    1095             :                  * If desired, add a context line to show that this is a
    1096             :                  * message propagated from a parallel worker.  Otherwise, it
    1097             :                  * can sometimes be confusing to understand what actually
    1098             :                  * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
    1099             :                  * because it causes test-result instability depending on
    1100             :                  * whether a parallel worker is actually used or not.)
    1101             :                  */
    1102           4 :                 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
    1103             :                 {
    1104           4 :                     if (edata.context)
    1105           0 :                         edata.context = psprintf("%s\n%s", edata.context,
    1106             :                                                  _("parallel worker"));
    1107             :                     else
    1108           4 :                         edata.context = pstrdup(_("parallel worker"));
    1109             :                 }
    1110             : 
    1111             :                 /*
    1112             :                  * Context beyond that should use the error context callbacks
    1113             :                  * that were in effect when the ParallelContext was created,
    1114             :                  * not the current ones.
    1115             :                  */
    1116           4 :                 save_error_context_stack = error_context_stack;
    1117           4 :                 error_context_stack = pcxt->error_context_stack;
    1118             : 
    1119             :                 /* Rethrow error or print notice. */
    1120           4 :                 ThrowErrorData(&edata);
    1121             : 
    1122             :                 /* Not an error, so restore previous context stack. */
    1123           0 :                 error_context_stack = save_error_context_stack;
    1124             : 
    1125           0 :                 break;
    1126             :             }
    1127             : 
    1128             :         case 'A':               /* NotifyResponse */
    1129             :             {
    1130             :                 /* Propagate NotifyResponse. */
    1131             :                 int32       pid;
    1132             :                 const char *channel;
    1133             :                 const char *payload;
    1134             : 
    1135           0 :                 pid = pq_getmsgint(msg, 4);
    1136           0 :                 channel = pq_getmsgrawstring(msg);
    1137           0 :                 payload = pq_getmsgrawstring(msg);
    1138           0 :                 pq_endmessage(msg);
    1139             : 
    1140           0 :                 NotifyMyFrontEnd(channel, payload, pid);
    1141             : 
    1142           0 :                 break;
    1143             :             }
    1144             : 
    1145             :         case 'X':               /* Terminate, indicating clean exit */
    1146             :             {
    1147        1514 :                 shm_mq_detach(pcxt->worker[i].error_mqh);
    1148        1514 :                 pcxt->worker[i].error_mqh = NULL;
    1149        1514 :                 break;
    1150             :             }
    1151             : 
    1152             :         default:
    1153             :             {
    1154           0 :                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
    1155             :                      msgtype, msg->len);
    1156             :             }
    1157             :     }
    1158        3032 : }
    1159             : 
    1160             : /*
    1161             :  * End-of-subtransaction cleanup for parallel contexts.
    1162             :  *
    1163             :  * Currently, it's forbidden to enter or leave a subtransaction while
    1164             :  * parallel mode is in effect, so we could just blow away everything.  But
    1165             :  * we may want to relax that restriction in the future, so this code
    1166             :  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
    1167             :  */
    1168             : void
    1169           4 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
    1170             : {
    1171          12 :     while (!dlist_is_empty(&pcxt_list))
    1172             :     {
    1173             :         ParallelContext *pcxt;
    1174             : 
    1175           4 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1176           4 :         if (pcxt->subid != mySubId)
    1177           0 :             break;
    1178           4 :         if (isCommit)
    1179           0 :             elog(WARNING, "leaked parallel context");
    1180           4 :         DestroyParallelContext(pcxt);
    1181             :     }
    1182           4 : }
    1183             : 
    1184             : /*
    1185             :  * End-of-transaction cleanup for parallel contexts.
    1186             :  */
    1187             : void
    1188        1518 : AtEOXact_Parallel(bool isCommit)
    1189             : {
    1190        3036 :     while (!dlist_is_empty(&pcxt_list))
    1191             :     {
    1192             :         ParallelContext *pcxt;
    1193             : 
    1194           0 :         pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
    1195           0 :         if (isCommit)
    1196           0 :             elog(WARNING, "leaked parallel context");
    1197           0 :         DestroyParallelContext(pcxt);
    1198             :     }
    1199        1518 : }
    1200             : 
    1201             : /*
    1202             :  * Main entrypoint for parallel workers.
    1203             :  */
    1204             : void
    1205        1518 : ParallelWorkerMain(Datum main_arg)
    1206             : {
    1207             :     dsm_segment *seg;
    1208             :     shm_toc    *toc;
    1209             :     FixedParallelState *fps;
    1210             :     char       *error_queue_space;
    1211             :     shm_mq     *mq;
    1212             :     shm_mq_handle *mqh;
    1213             :     char       *libraryspace;
    1214             :     char       *entrypointstate;
    1215             :     char       *library_name;
    1216             :     char       *function_name;
    1217             :     parallel_worker_main_type entrypt;
    1218             :     char       *gucspace;
    1219             :     char       *combocidspace;
    1220             :     char       *tsnapspace;
    1221             :     char       *asnapspace;
    1222             :     char       *tstatespace;
    1223             :     char       *reindexspace;
    1224             :     char       *relmapperspace;
    1225             :     char       *enumblacklistspace;
    1226             :     StringInfoData msgbuf;
    1227             :     char       *session_dsm_handle_space;
    1228             : 
    1229             :     /* Set flag to indicate that we're initializing a parallel worker. */
    1230        1518 :     InitializingParallelWorker = true;
    1231             : 
    1232             :     /* Establish signal handlers. */
    1233        1518 :     pqsignal(SIGTERM, die);
    1234        1518 :     BackgroundWorkerUnblockSignals();
    1235             : 
    1236             :     /* Determine and set our parallel worker number. */
    1237             :     Assert(ParallelWorkerNumber == -1);
    1238        1518 :     memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
    1239             : 
    1240             :     /* Set up a memory context to work in, just for cleanliness. */
    1241        1518 :     CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
    1242             :                                                  "Parallel worker",
    1243             :                                                  ALLOCSET_DEFAULT_SIZES);
    1244             : 
    1245             :     /*
    1246             :      * Attach to the dynamic shared memory segment for the parallel query, and
    1247             :      * find its table of contents.
    1248             :      *
    1249             :      * Note: at this point, we have not created any ResourceOwner in this
    1250             :      * process.  This will result in our DSM mapping surviving until process
    1251             :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
    1252             :      * ownership of the mapping, but we have no need for that.
    1253             :      */
    1254        1518 :     seg = dsm_attach(DatumGetUInt32(main_arg));
    1255        1518 :     if (seg == NULL)
    1256           0 :         ereport(ERROR,
    1257             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1258             :                  errmsg("could not map dynamic shared memory segment")));
    1259        1518 :     toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
    1260        1518 :     if (toc == NULL)
    1261           0 :         ereport(ERROR,
    1262             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1263             :                  errmsg("invalid magic number in dynamic shared memory segment")));
    1264             : 
    1265             :     /* Look up fixed parallel state. */
    1266        1518 :     fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
    1267        1518 :     MyFixedParallelState = fps;
    1268             : 
    1269             :     /* Arrange to signal the leader if we exit. */
    1270        1518 :     ParallelMasterPid = fps->parallel_master_pid;
    1271        1518 :     ParallelMasterBackendId = fps->parallel_master_backend_id;
    1272        1518 :     on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
    1273             : 
    1274             :     /*
    1275             :      * Now we can find and attach to the error queue provided for us.  That's
    1276             :      * good, because until we do that, any errors that happen here will not be
    1277             :      * reported back to the process that requested that this worker be
    1278             :      * launched.
    1279             :      */
    1280        1518 :     error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
    1281        1518 :     mq = (shm_mq *) (error_queue_space +
    1282        1518 :                      ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
    1283        1518 :     shm_mq_set_sender(mq, MyProc);
    1284        1518 :     mqh = shm_mq_attach(mq, seg, NULL);
    1285        1518 :     pq_redirect_to_shm_mq(seg, mqh);
    1286        1518 :     pq_set_parallel_master(fps->parallel_master_pid,
    1287             :                            fps->parallel_master_backend_id);
    1288             : 
    1289             :     /*
    1290             :      * Send a BackendKeyData message to the process that initiated parallelism
    1291             :      * so that it has access to our PID before it receives any other messages
    1292             :      * from us.  Our cancel key is sent, too, since that's the way the
    1293             :      * protocol message is defined, but it won't actually be used for anything
    1294             :      * in this case.
    1295             :      */
    1296        1518 :     pq_beginmessage(&msgbuf, 'K');
    1297        1518 :     pq_sendint32(&msgbuf, (int32) MyProcPid);
    1298        1518 :     pq_sendint32(&msgbuf, (int32) MyCancelKey);
    1299        1518 :     pq_endmessage(&msgbuf);
    1300             : 
    1301             :     /*
    1302             :      * Hooray! Primary initialization is complete.  Now, we need to set up our
    1303             :      * backend-local state to match the original backend.
    1304             :      */
    1305             : 
    1306             :     /*
    1307             :      * Join locking group.  We must do this before anything that could try to
    1308             :      * acquire a heavyweight lock, because any heavyweight locks acquired to
    1309             :      * this point could block either directly against the parallel group
    1310             :      * leader or against some process which in turn waits for a lock that
    1311             :      * conflicts with the parallel group leader, causing an undetected
    1312             :      * deadlock.  (If we can't join the lock group, the leader has gone away,
    1313             :      * so just exit quietly.)
    1314             :      */
    1315        1518 :     if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
    1316             :                                fps->parallel_master_pid))
    1317           0 :         return;
    1318             : 
    1319             :     /*
    1320             :      * Restore transaction and statement start-time timestamps.  This must
    1321             :      * happen before anything that would start a transaction, else asserts in
    1322             :      * xact.c will fire.
    1323             :      */
    1324        1518 :     SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
    1325             : 
    1326             :     /*
    1327             :      * Identify the entry point to be called.  In theory this could result in
    1328             :      * loading an additional library, though most likely the entry point is in
    1329             :      * the core backend or in a library we just loaded.
    1330             :      */
    1331        1518 :     entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
    1332        1518 :     library_name = entrypointstate;
    1333        1518 :     function_name = entrypointstate + strlen(library_name) + 1;
    1334             : 
    1335        1518 :     entrypt = LookupParallelWorkerFunction(library_name, function_name);
    1336             : 
    1337             :     /* Restore database connection. */
    1338        1518 :     BackgroundWorkerInitializeConnectionByOid(fps->database_id,
    1339             :                                               fps->authenticated_user_id,
    1340             :                                               0);
    1341             : 
    1342             :     /*
    1343             :      * Set the client encoding to the database encoding, since that is what
    1344             :      * the leader will expect.
    1345             :      */
    1346        1518 :     SetClientEncoding(GetDatabaseEncoding());
    1347             : 
    1348             :     /*
    1349             :      * Load libraries that were loaded by original backend.  We want to do
    1350             :      * this before restoring GUCs, because the libraries might define custom
    1351             :      * variables.
    1352             :      */
    1353        1518 :     libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
    1354        1518 :     StartTransactionCommand();
    1355        1518 :     RestoreLibraryState(libraryspace);
    1356             : 
    1357             :     /* Restore GUC values from launching backend. */
    1358        1518 :     gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
    1359        1518 :     RestoreGUCState(gucspace);
    1360        1518 :     CommitTransactionCommand();
    1361             : 
    1362             :     /* Crank up a transaction state appropriate to a parallel worker. */
    1363        1518 :     tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
    1364        1518 :     StartParallelWorkerTransaction(tstatespace);
    1365             : 
    1366             :     /* Restore combo CID state. */
    1367        1518 :     combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
    1368        1518 :     RestoreComboCIDState(combocidspace);
    1369             : 
    1370             :     /* Attach to the per-session DSM segment and contained objects. */
    1371        1518 :     session_dsm_handle_space =
    1372             :         shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
    1373        1518 :     AttachSession(*(dsm_handle *) session_dsm_handle_space);
    1374             : 
    1375             :     /* Restore transaction snapshot. */
    1376        1518 :     tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
    1377        1518 :     RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
    1378        1518 :                                fps->parallel_master_pgproc);
    1379             : 
    1380             :     /* Restore active snapshot. */
    1381        1518 :     asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
    1382        1518 :     PushActiveSnapshot(RestoreSnapshot(asnapspace));
    1383             : 
    1384             :     /*
    1385             :      * We've changed which tuples we can see, and must therefore invalidate
    1386             :      * system caches.
    1387             :      */
    1388        1518 :     InvalidateSystemCaches();
    1389             : 
    1390             :     /*
    1391             :      * Restore current role id.  Skip verifying whether session user is
    1392             :      * allowed to become this role and blindly restore the leader's state for
    1393             :      * current role.
    1394             :      */
    1395        1518 :     SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
    1396             : 
    1397             :     /* Restore user ID and security context. */
    1398        1518 :     SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
    1399             : 
    1400             :     /* Restore temp-namespace state to ensure search path matches leader's. */
    1401        1518 :     SetTempNamespaceState(fps->temp_namespace_id,
    1402             :                           fps->temp_toast_namespace_id);
    1403             : 
    1404             :     /* Restore reindex state. */
    1405        1518 :     reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
    1406        1518 :     RestoreReindexState(reindexspace);
    1407             : 
    1408             :     /* Restore relmapper state. */
    1409        1518 :     relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
    1410        1518 :     RestoreRelationMap(relmapperspace);
    1411             : 
    1412             :     /* Restore enum blacklist. */
    1413        1518 :     enumblacklistspace = shm_toc_lookup(toc, PARALLEL_KEY_ENUMBLACKLIST,
    1414             :                                         false);
    1415        1518 :     RestoreEnumBlacklist(enumblacklistspace);
    1416             : 
    1417             :     /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
    1418        1518 :     AttachSerializableXact(fps->serializable_xact_handle);
    1419             : 
    1420             :     /*
    1421             :      * We've initialized all of our state now; nothing should change
    1422             :      * hereafter.
    1423             :      */
    1424        1518 :     InitializingParallelWorker = false;
    1425        1518 :     EnterParallelMode();
    1426             : 
    1427             :     /*
    1428             :      * Time to do the real work: invoke the caller-supplied code.
    1429             :      */
    1430        1518 :     entrypt(seg, toc);
    1431             : 
    1432             :     /* Must exit parallel mode to pop active snapshot. */
    1433        1514 :     ExitParallelMode();
    1434             : 
    1435             :     /* Must pop active snapshot so snapmgr.c doesn't complain. */
    1436        1514 :     PopActiveSnapshot();
    1437             : 
    1438             :     /* Shut down the parallel-worker transaction. */
    1439        1514 :     EndParallelWorkerTransaction();
    1440             : 
    1441             :     /* Detach from the per-session DSM segment. */
    1442        1514 :     DetachSession();
    1443             : 
    1444             :     /* Report success. */
    1445        1514 :     pq_putmessage('X', NULL, 0);
    1446             : }
    1447             : 
    1448             : /*
    1449             :  * Update shared memory with the ending location of the last WAL record we
    1450             :  * wrote, if it's greater than the value already stored there.
    1451             :  */
    1452             : void
    1453        1514 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
    1454             : {
    1455        1514 :     FixedParallelState *fps = MyFixedParallelState;
    1456             : 
    1457             :     Assert(fps != NULL);
    1458        1514 :     SpinLockAcquire(&fps->mutex);
    1459        1514 :     if (fps->last_xlog_end < last_xlog_end)
    1460          94 :         fps->last_xlog_end = last_xlog_end;
    1461        1514 :     SpinLockRelease(&fps->mutex);
    1462        1514 : }
    1463             : 
    1464             : /*
    1465             :  * Make sure the leader tries to read from our error queue one more time.
    1466             :  * This guards against the case where we exit uncleanly without sending an
    1467             :  * ErrorResponse to the leader, for example because some code calls proc_exit
    1468             :  * directly.
    1469             :  */
    1470             : static void
    1471        1518 : ParallelWorkerShutdown(int code, Datum arg)
    1472             : {
    1473        1518 :     SendProcSignal(ParallelMasterPid,
    1474             :                    PROCSIG_PARALLEL_MESSAGE,
    1475             :                    ParallelMasterBackendId);
    1476        1518 : }
    1477             : 
    1478             : /*
    1479             :  * Look up (and possibly load) a parallel worker entry point function.
    1480             :  *
    1481             :  * For functions contained in the core code, we use library name "postgres"
    1482             :  * and consult the InternalParallelWorkers array.  External functions are
    1483             :  * looked up, and loaded if necessary, using load_external_function().
    1484             :  *
    1485             :  * The point of this is to pass function names as strings across process
    1486             :  * boundaries.  We can't pass actual function addresses because of the
    1487             :  * possibility that the function has been loaded at a different address
    1488             :  * in a different process.  This is obviously a hazard for functions in
    1489             :  * loadable libraries, but it can happen even for functions in the core code
    1490             :  * on platforms using EXEC_BACKEND (e.g., Windows).
    1491             :  *
    1492             :  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
    1493             :  * in favor of applying load_external_function() for core functions too;
    1494             :  * but that raises portability issues that are not worth addressing now.
    1495             :  */
    1496             : static parallel_worker_main_type
    1497        1518 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
    1498             : {
    1499             :     /*
    1500             :      * If the function is to be loaded from postgres itself, search the
    1501             :      * InternalParallelWorkers array.
    1502             :      */
    1503        1518 :     if (strcmp(libraryname, "postgres") == 0)
    1504             :     {
    1505             :         int         i;
    1506             : 
    1507        1602 :         for (i = 0; i < lengthof(InternalParallelWorkers); i++)
    1508             :         {
    1509        1602 :             if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
    1510        1518 :                 return InternalParallelWorkers[i].fn_addr;
    1511             :         }
    1512             : 
    1513             :         /* We can only reach this by programming error. */
    1514           0 :         elog(ERROR, "internal function \"%s\" not found", funcname);
    1515             :     }
    1516             : 
    1517             :     /* Otherwise load from external library. */
    1518           0 :     return (parallel_worker_main_type)
    1519             :         load_external_function(libraryname, funcname, true, NULL);
    1520             : }

Generated by: LCOV version 1.13