LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - setup.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 82 94 87.2 %
Date: 2024-12-12 16:15:21 Functions: 5 6 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*--------------------------------------------------------------------------
       2             :  *
       3             :  * setup.c
       4             :  *      Code to set up a dynamic shared memory segments and a specified
       5             :  *      number of background workers for shared memory message queue
       6             :  *      testing.
       7             :  *
       8             :  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/test_shm_mq/setup.c
      12             :  *
      13             :  * -------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "miscadmin.h"
      19             : #include "pgstat.h"
      20             : #include "postmaster/bgworker.h"
      21             : #include "storage/shm_toc.h"
      22             : #include "test_shm_mq.h"
      23             : #include "utils/memutils.h"
      24             : 
      25             : typedef struct
      26             : {
      27             :     int         nworkers;
      28             :     BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
      29             : } worker_state;
      30             : 
      31             : static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      32             :                                         dsm_segment **segp,
      33             :                                         test_shm_mq_header **hdrp,
      34             :                                         shm_mq **outp, shm_mq **inp);
      35             : static worker_state *setup_background_workers(int nworkers,
      36             :                                               dsm_segment *seg);
      37             : static void cleanup_background_workers(dsm_segment *seg, Datum arg);
      38             : static void wait_for_workers_to_become_ready(worker_state *wstate,
      39             :                                              volatile test_shm_mq_header *hdr);
      40             : static bool check_worker_status(worker_state *wstate);
      41             : 
      42             : /* value cached, fetched from shared memory */
      43             : static uint32 we_bgworker_startup = 0;
      44             : 
      45             : /*
      46             :  * Set up a dynamic shared memory segment and zero or more background workers
      47             :  * for a test run.
      48             :  */
      49             : void
      50          10 : test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
      51             :                   shm_mq_handle **output, shm_mq_handle **input)
      52             : {
      53             :     dsm_segment *seg;
      54             :     test_shm_mq_header *hdr;
      55          10 :     shm_mq     *outq = NULL;    /* placate compiler */
      56          10 :     shm_mq     *inq = NULL;     /* placate compiler */
      57             :     worker_state *wstate;
      58             : 
      59             :     /* Set up a dynamic shared memory segment. */
      60          10 :     setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
      61          10 :     *segp = seg;
      62             : 
      63             :     /* Register background workers. */
      64          10 :     wstate = setup_background_workers(nworkers, seg);
      65             : 
      66             :     /* Attach the queues. */
      67          10 :     *output = shm_mq_attach(outq, seg, wstate->handle[0]);
      68          10 :     *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
      69             : 
      70             :     /* Wait for workers to become ready. */
      71          10 :     wait_for_workers_to_become_ready(wstate, hdr);
      72             : 
      73             :     /*
      74             :      * Once we reach this point, all workers are ready.  We no longer need to
      75             :      * kill them if we die; they'll die on their own as the message queues
      76             :      * shut down.
      77             :      */
      78          10 :     cancel_on_dsm_detach(seg, cleanup_background_workers,
      79             :                          PointerGetDatum(wstate));
      80          10 :     pfree(wstate);
      81          10 : }
      82             : 
      83             : /*
      84             :  * Set up a dynamic shared memory segment.
      85             :  *
      86             :  * We set up a small control region that contains only a test_shm_mq_header,
      87             :  * plus one region per message queue.  There are as many message queues as
      88             :  * the number of workers, plus one.
      89             :  */
      90             : static void
      91          10 : setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      92             :                             dsm_segment **segp, test_shm_mq_header **hdrp,
      93             :                             shm_mq **outp, shm_mq **inp)
      94             : {
      95             :     shm_toc_estimator e;
      96             :     int         i;
      97             :     Size        segsize;
      98             :     dsm_segment *seg;
      99             :     shm_toc    *toc;
     100             :     test_shm_mq_header *hdr;
     101             : 
     102             :     /* Ensure a valid queue size. */
     103          10 :     if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
     104           0 :         ereport(ERROR,
     105             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     106             :                  errmsg("queue size must be at least %zu bytes",
     107             :                         shm_mq_minimum_size)));
     108             :     if (queue_size != ((Size) queue_size))
     109             :         ereport(ERROR,
     110             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     111             :                  errmsg("queue size overflows size_t")));
     112             : 
     113             :     /*
     114             :      * Estimate how much shared memory we need.
     115             :      *
     116             :      * Because the TOC machinery may choose to insert padding of oddly-sized
     117             :      * requests, we must estimate each chunk separately.
     118             :      *
     119             :      * We need one key to register the location of the header, and we need
     120             :      * nworkers + 1 keys to track the locations of the message queues.
     121             :      */
     122          10 :     shm_toc_initialize_estimator(&e);
     123          10 :     shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
     124          34 :     for (i = 0; i <= nworkers; ++i)
     125          24 :         shm_toc_estimate_chunk(&e, (Size) queue_size);
     126          10 :     shm_toc_estimate_keys(&e, 2 + nworkers);
     127          10 :     segsize = shm_toc_estimate(&e);
     128             : 
     129             :     /* Create the shared memory segment and establish a table of contents. */
     130          10 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     131          10 :     toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
     132             :                          segsize);
     133             : 
     134             :     /* Set up the header region. */
     135          10 :     hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
     136          10 :     SpinLockInit(&hdr->mutex);
     137          10 :     hdr->workers_total = nworkers;
     138          10 :     hdr->workers_attached = 0;
     139          10 :     hdr->workers_ready = 0;
     140          10 :     shm_toc_insert(toc, 0, hdr);
     141             : 
     142             :     /* Set up one message queue per worker, plus one. */
     143          34 :     for (i = 0; i <= nworkers; ++i)
     144             :     {
     145             :         shm_mq     *mq;
     146             : 
     147          24 :         mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
     148             :                            (Size) queue_size);
     149          24 :         shm_toc_insert(toc, i + 1, mq);
     150             : 
     151          24 :         if (i == 0)
     152             :         {
     153             :             /* We send messages to the first queue. */
     154          10 :             shm_mq_set_sender(mq, MyProc);
     155          10 :             *outp = mq;
     156             :         }
     157          24 :         if (i == nworkers)
     158             :         {
     159             :             /* We receive messages from the last queue. */
     160          10 :             shm_mq_set_receiver(mq, MyProc);
     161          10 :             *inp = mq;
     162             :         }
     163             :     }
     164             : 
     165             :     /* Return results to caller. */
     166          10 :     *segp = seg;
     167          10 :     *hdrp = hdr;
     168          10 : }
     169             : 
     170             : /*
     171             :  * Register background workers.
     172             :  */
     173             : static worker_state *
     174          10 : setup_background_workers(int nworkers, dsm_segment *seg)
     175             : {
     176             :     MemoryContext oldcontext;
     177             :     BackgroundWorker worker;
     178             :     worker_state *wstate;
     179             :     int         i;
     180             : 
     181             :     /*
     182             :      * We need the worker_state object and the background worker handles to
     183             :      * which it points to be allocated in CurTransactionContext rather than
     184             :      * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
     185             :      * hooks run.
     186             :      */
     187          10 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     188             : 
     189             :     /* Create worker state object. */
     190          10 :     wstate = MemoryContextAlloc(TopTransactionContext,
     191          10 :                                 offsetof(worker_state, handle) +
     192             :                                 sizeof(BackgroundWorkerHandle *) * nworkers);
     193          10 :     wstate->nworkers = 0;
     194             : 
     195             :     /*
     196             :      * Arrange to kill all the workers if we abort before all workers are
     197             :      * finished hooking themselves up to the dynamic shared memory segment.
     198             :      *
     199             :      * If we die after all the workers have finished hooking themselves up to
     200             :      * the dynamic shared memory segment, we'll mark the two queues to which
     201             :      * we're directly connected as detached, and the worker(s) connected to
     202             :      * those queues will exit, marking any other queues to which they are
     203             :      * connected as detached.  This will cause any as-yet-unaware workers
     204             :      * connected to those queues to exit in their turn, and so on, until
     205             :      * everybody exits.
     206             :      *
     207             :      * But suppose the workers which are supposed to connect to the queues to
     208             :      * which we're directly attached exit due to some error before they
     209             :      * actually attach the queues.  The remaining workers will have no way of
     210             :      * knowing this.  From their perspective, they're still waiting for those
     211             :      * workers to start, when in fact they've already died.
     212             :      */
     213          10 :     on_dsm_detach(seg, cleanup_background_workers,
     214             :                   PointerGetDatum(wstate));
     215             : 
     216             :     /* Configure a worker. */
     217          10 :     memset(&worker, 0, sizeof(worker));
     218          10 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     219          10 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     220          10 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     221          10 :     sprintf(worker.bgw_library_name, "test_shm_mq");
     222          10 :     sprintf(worker.bgw_function_name, "test_shm_mq_main");
     223          10 :     snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
     224          10 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
     225             :     /* set bgw_notify_pid, so we can detect if the worker stops */
     226          10 :     worker.bgw_notify_pid = MyProcPid;
     227             : 
     228             :     /* Register the workers. */
     229          24 :     for (i = 0; i < nworkers; ++i)
     230             :     {
     231          14 :         if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
     232           0 :             ereport(ERROR,
     233             :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     234             :                      errmsg("could not register background process"),
     235             :                      errhint("You may need to increase \"max_worker_processes\".")));
     236          14 :         ++wstate->nworkers;
     237             :     }
     238             : 
     239             :     /* All done. */
     240          10 :     MemoryContextSwitchTo(oldcontext);
     241          10 :     return wstate;
     242             : }
     243             : 
     244             : static void
     245           0 : cleanup_background_workers(dsm_segment *seg, Datum arg)
     246             : {
     247           0 :     worker_state *wstate = (worker_state *) DatumGetPointer(arg);
     248             : 
     249           0 :     while (wstate->nworkers > 0)
     250             :     {
     251           0 :         --wstate->nworkers;
     252           0 :         TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
     253             :     }
     254           0 : }
     255             : 
     256             : static void
     257          10 : wait_for_workers_to_become_ready(worker_state *wstate,
     258             :                                  volatile test_shm_mq_header *hdr)
     259             : {
     260          10 :     bool        result = false;
     261             : 
     262             :     for (;;)
     263          50 :     {
     264             :         int         workers_ready;
     265             : 
     266             :         /* If all the workers are ready, we have succeeded. */
     267          60 :         SpinLockAcquire(&hdr->mutex);
     268          60 :         workers_ready = hdr->workers_ready;
     269          60 :         SpinLockRelease(&hdr->mutex);
     270          60 :         if (workers_ready >= wstate->nworkers)
     271             :         {
     272          10 :             result = true;
     273          10 :             break;
     274             :         }
     275             : 
     276             :         /* If any workers (or the postmaster) have died, we have failed. */
     277          50 :         if (!check_worker_status(wstate))
     278             :         {
     279           0 :             result = false;
     280           0 :             break;
     281             :         }
     282             : 
     283             :         /* first time, allocate or get the custom wait event */
     284          50 :         if (we_bgworker_startup == 0)
     285           2 :             we_bgworker_startup = WaitEventExtensionNew("TestShmMqBgWorkerStartup");
     286             : 
     287             :         /* Wait to be signaled. */
     288          50 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     289             :                          we_bgworker_startup);
     290             : 
     291             :         /* Reset the latch so we don't spin. */
     292          50 :         ResetLatch(MyLatch);
     293             : 
     294             :         /* An interrupt may have occurred while we were waiting. */
     295          50 :         CHECK_FOR_INTERRUPTS();
     296             :     }
     297             : 
     298          10 :     if (!result)
     299           0 :         ereport(ERROR,
     300             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     301             :                  errmsg("one or more background workers failed to start")));
     302          10 : }
     303             : 
     304             : static bool
     305          50 : check_worker_status(worker_state *wstate)
     306             : {
     307             :     int         n;
     308             : 
     309             :     /* If any workers (or the postmaster) have died, we have failed. */
     310         132 :     for (n = 0; n < wstate->nworkers; ++n)
     311             :     {
     312             :         BgwHandleStatus status;
     313             :         pid_t       pid;
     314             : 
     315          82 :         status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
     316          82 :         if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
     317           0 :             return false;
     318             :     }
     319             : 
     320             :     /* Otherwise, things still look OK. */
     321          50 :     return true;
     322             : }

Generated by: LCOV version 1.14