LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - setup.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.4 % 95 83
Test Date: 2026-04-03 01:15:53 Functions: 83.3 % 6 5
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*--------------------------------------------------------------------------
       2              :  *
       3              :  * setup.c
       4              :  *      Code to set up a dynamic shared memory segments and a specified
       5              :  *      number of background workers for shared memory message queue
       6              :  *      testing.
       7              :  *
       8              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *      src/test/modules/test_shm_mq/setup.c
      12              :  *
      13              :  * -------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres.h"
      17              : 
      18              : #include "miscadmin.h"
      19              : #include "pgstat.h"
      20              : #include "postmaster/bgworker.h"
      21              : #include "storage/proc.h"
      22              : #include "storage/shm_toc.h"
      23              : #include "test_shm_mq.h"
      24              : #include "utils/memutils.h"
      25              : #include "utils/wait_event.h"
      26              : 
      27              : typedef struct
      28              : {
      29              :     int         nworkers;
      30              :     BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
      31              : } worker_state;
      32              : 
      33              : static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      34              :                                         dsm_segment **segp,
      35              :                                         test_shm_mq_header **hdrp,
      36              :                                         shm_mq **outp, shm_mq **inp);
      37              : static worker_state *setup_background_workers(int nworkers,
      38              :                                               dsm_segment *seg);
      39              : static void cleanup_background_workers(dsm_segment *seg, Datum arg);
      40              : static void wait_for_workers_to_become_ready(worker_state *wstate,
      41              :                                              volatile test_shm_mq_header *hdr);
      42              : static bool check_worker_status(worker_state *wstate);
      43              : 
      44              : /* value cached, fetched from shared memory */
      45              : static uint32 we_bgworker_startup = 0;
      46              : 
      47              : /*
      48              :  * Set up a dynamic shared memory segment and zero or more background workers
      49              :  * for a test run.
      50              :  */
      51              : void
      52            5 : test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
      53              :                   shm_mq_handle **output, shm_mq_handle **input)
      54              : {
      55              :     dsm_segment *seg;
      56              :     test_shm_mq_header *hdr;
      57            5 :     shm_mq     *outq = NULL;    /* placate compiler */
      58            5 :     shm_mq     *inq = NULL;     /* placate compiler */
      59              :     worker_state *wstate;
      60              : 
      61              :     /* Set up a dynamic shared memory segment. */
      62            5 :     setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
      63            5 :     *segp = seg;
      64              : 
      65              :     /* Register background workers. */
      66            5 :     wstate = setup_background_workers(nworkers, seg);
      67              : 
      68              :     /* Attach the queues. */
      69            5 :     *output = shm_mq_attach(outq, seg, wstate->handle[0]);
      70            5 :     *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
      71              : 
      72              :     /* Wait for workers to become ready. */
      73            5 :     wait_for_workers_to_become_ready(wstate, hdr);
      74              : 
      75              :     /*
      76              :      * Once we reach this point, all workers are ready.  We no longer need to
      77              :      * kill them if we die; they'll die on their own as the message queues
      78              :      * shut down.
      79              :      */
      80            5 :     cancel_on_dsm_detach(seg, cleanup_background_workers,
      81              :                          PointerGetDatum(wstate));
      82            5 :     pfree(wstate);
      83            5 : }
      84              : 
      85              : /*
      86              :  * Set up a dynamic shared memory segment.
      87              :  *
      88              :  * We set up a small control region that contains only a test_shm_mq_header,
      89              :  * plus one region per message queue.  There are as many message queues as
      90              :  * the number of workers, plus one.
      91              :  */
      92              : static void
      93            5 : setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      94              :                             dsm_segment **segp, test_shm_mq_header **hdrp,
      95              :                             shm_mq **outp, shm_mq **inp)
      96              : {
      97              :     shm_toc_estimator e;
      98              :     int         i;
      99              :     Size        segsize;
     100              :     dsm_segment *seg;
     101              :     shm_toc    *toc;
     102              :     test_shm_mq_header *hdr;
     103              : 
     104              :     /* Ensure a valid queue size. */
     105            5 :     if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
     106            0 :         ereport(ERROR,
     107              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     108              :                  errmsg("queue size must be at least %zu bytes",
     109              :                         shm_mq_minimum_size)));
     110              :     if (queue_size != ((Size) queue_size))
     111              :         ereport(ERROR,
     112              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     113              :                  errmsg("queue size overflows size_t")));
     114              : 
     115              :     /*
     116              :      * Estimate how much shared memory we need.
     117              :      *
     118              :      * Because the TOC machinery may choose to insert padding of oddly-sized
     119              :      * requests, we must estimate each chunk separately.
     120              :      *
     121              :      * We need one key to register the location of the header, and we need
     122              :      * nworkers + 1 keys to track the locations of the message queues.
     123              :      */
     124            5 :     shm_toc_initialize_estimator(&e);
     125            5 :     shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
     126           17 :     for (i = 0; i <= nworkers; ++i)
     127           12 :         shm_toc_estimate_chunk(&e, (Size) queue_size);
     128            5 :     shm_toc_estimate_keys(&e, 2 + nworkers);
     129            5 :     segsize = shm_toc_estimate(&e);
     130              : 
     131              :     /* Create the shared memory segment and establish a table of contents. */
     132            5 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     133            5 :     toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
     134              :                          segsize);
     135              : 
     136              :     /* Set up the header region. */
     137            5 :     hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
     138            5 :     SpinLockInit(&hdr->mutex);
     139            5 :     hdr->workers_total = nworkers;
     140            5 :     hdr->workers_attached = 0;
     141            5 :     hdr->workers_ready = 0;
     142            5 :     shm_toc_insert(toc, 0, hdr);
     143              : 
     144              :     /* Set up one message queue per worker, plus one. */
     145           17 :     for (i = 0; i <= nworkers; ++i)
     146              :     {
     147              :         shm_mq     *mq;
     148              : 
     149           12 :         mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
     150              :                            (Size) queue_size);
     151           12 :         shm_toc_insert(toc, i + 1, mq);
     152              : 
     153           12 :         if (i == 0)
     154              :         {
     155              :             /* We send messages to the first queue. */
     156            5 :             shm_mq_set_sender(mq, MyProc);
     157            5 :             *outp = mq;
     158              :         }
     159           12 :         if (i == nworkers)
     160              :         {
     161              :             /* We receive messages from the last queue. */
     162            5 :             shm_mq_set_receiver(mq, MyProc);
     163            5 :             *inp = mq;
     164              :         }
     165              :     }
     166              : 
     167              :     /* Return results to caller. */
     168            5 :     *segp = seg;
     169            5 :     *hdrp = hdr;
     170            5 : }
     171              : 
     172              : /*
     173              :  * Register background workers.
     174              :  */
     175              : static worker_state *
     176            5 : setup_background_workers(int nworkers, dsm_segment *seg)
     177              : {
     178              :     MemoryContext oldcontext;
     179              :     BackgroundWorker worker;
     180              :     worker_state *wstate;
     181              :     int         i;
     182              : 
     183              :     /*
     184              :      * We need the worker_state object and the background worker handles to
     185              :      * which it points to be allocated in CurTransactionContext rather than
     186              :      * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
     187              :      * hooks run.
     188              :      */
     189            5 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     190              : 
     191              :     /* Create worker state object. */
     192            5 :     wstate = MemoryContextAlloc(TopTransactionContext,
     193            5 :                                 offsetof(worker_state, handle) +
     194              :                                 sizeof(BackgroundWorkerHandle *) * nworkers);
     195            5 :     wstate->nworkers = 0;
     196              : 
     197              :     /*
     198              :      * Arrange to kill all the workers if we abort before all workers are
     199              :      * finished hooking themselves up to the dynamic shared memory segment.
     200              :      *
     201              :      * If we die after all the workers have finished hooking themselves up to
     202              :      * the dynamic shared memory segment, we'll mark the two queues to which
     203              :      * we're directly connected as detached, and the worker(s) connected to
     204              :      * those queues will exit, marking any other queues to which they are
     205              :      * connected as detached.  This will cause any as-yet-unaware workers
     206              :      * connected to those queues to exit in their turn, and so on, until
     207              :      * everybody exits.
     208              :      *
     209              :      * But suppose the workers which are supposed to connect to the queues to
     210              :      * which we're directly attached exit due to some error before they
     211              :      * actually attach the queues.  The remaining workers will have no way of
     212              :      * knowing this.  From their perspective, they're still waiting for those
     213              :      * workers to start, when in fact they've already died.
     214              :      */
     215            5 :     on_dsm_detach(seg, cleanup_background_workers,
     216              :                   PointerGetDatum(wstate));
     217              : 
     218              :     /* Configure a worker. */
     219            5 :     memset(&worker, 0, sizeof(worker));
     220            5 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     221            5 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     222            5 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     223            5 :     sprintf(worker.bgw_library_name, "test_shm_mq");
     224            5 :     sprintf(worker.bgw_function_name, "test_shm_mq_main");
     225            5 :     snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
     226            5 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
     227              :     /* set bgw_notify_pid, so we can detect if the worker stops */
     228            5 :     worker.bgw_notify_pid = MyProcPid;
     229              : 
     230              :     /* Register the workers. */
     231           12 :     for (i = 0; i < nworkers; ++i)
     232              :     {
     233            7 :         snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq worker %d", i + 1);
     234            7 :         if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
     235            0 :             ereport(ERROR,
     236              :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     237              :                      errmsg("could not register background process"),
     238              :                      errhint("You may need to increase \"max_worker_processes\".")));
     239            7 :         ++wstate->nworkers;
     240              :     }
     241              : 
     242              :     /* All done. */
     243            5 :     MemoryContextSwitchTo(oldcontext);
     244            5 :     return wstate;
     245              : }
     246              : 
     247              : static void
     248            0 : cleanup_background_workers(dsm_segment *seg, Datum arg)
     249              : {
     250            0 :     worker_state *wstate = (worker_state *) DatumGetPointer(arg);
     251              : 
     252            0 :     while (wstate->nworkers > 0)
     253              :     {
     254            0 :         --wstate->nworkers;
     255            0 :         TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
     256              :     }
     257            0 : }
     258              : 
     259              : static void
     260            5 : wait_for_workers_to_become_ready(worker_state *wstate,
     261              :                                  volatile test_shm_mq_header *hdr)
     262              : {
     263            5 :     bool        result = false;
     264              : 
     265              :     for (;;)
     266           17 :     {
     267              :         int         workers_ready;
     268              : 
     269              :         /* If all the workers are ready, we have succeeded. */
     270           22 :         SpinLockAcquire(&hdr->mutex);
     271           22 :         workers_ready = hdr->workers_ready;
     272           22 :         SpinLockRelease(&hdr->mutex);
     273           22 :         if (workers_ready >= wstate->nworkers)
     274              :         {
     275            5 :             result = true;
     276            5 :             break;
     277              :         }
     278              : 
     279              :         /* If any workers (or the postmaster) have died, we have failed. */
     280           17 :         if (!check_worker_status(wstate))
     281              :         {
     282            0 :             result = false;
     283            0 :             break;
     284              :         }
     285              : 
     286              :         /* first time, allocate or get the custom wait event */
     287           17 :         if (we_bgworker_startup == 0)
     288            1 :             we_bgworker_startup = WaitEventExtensionNew("TestShmMqBgWorkerStartup");
     289              : 
     290              :         /* Wait to be signaled. */
     291           17 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     292              :                          we_bgworker_startup);
     293              : 
     294              :         /* Reset the latch so we don't spin. */
     295           17 :         ResetLatch(MyLatch);
     296              : 
     297              :         /* An interrupt may have occurred while we were waiting. */
     298           17 :         CHECK_FOR_INTERRUPTS();
     299              :     }
     300              : 
     301            5 :     if (!result)
     302            0 :         ereport(ERROR,
     303              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     304              :                  errmsg("one or more background workers failed to start")));
     305            5 : }
     306              : 
     307              : static bool
     308           17 : check_worker_status(worker_state *wstate)
     309              : {
     310              :     int         n;
     311              : 
     312              :     /* If any workers (or the postmaster) have died, we have failed. */
     313           48 :     for (n = 0; n < wstate->nworkers; ++n)
     314              :     {
     315              :         BgwHandleStatus status;
     316              :         pid_t       pid;
     317              : 
     318           31 :         status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
     319           31 :         if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
     320            0 :             return false;
     321              :     }
     322              : 
     323              :     /* Otherwise, things still look OK. */
     324           17 :     return true;
     325              : }
        

Generated by: LCOV version 2.0-1