LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - worker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.0 % 43 37
Test Date: 2026-03-14 21:14:51 Functions: 100.0 % 3 3
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*--------------------------------------------------------------------------
       2              :  *
       3              :  * worker.c
       4              :  *      Code for sample worker making use of shared memory message queues.
       5              :  *      Our test worker simply reads messages from one message queue and
       6              :  *      writes them back out to another message queue.  In a real
       7              :  *      application, you'd presumably want the worker to do some more
       8              :  *      complex calculation rather than simply returning the input,
       9              :  *      but it should be possible to use much of the control logic just
      10              :  *      as presented here.
      11              :  *
      12              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
      13              :  *
      14              :  * IDENTIFICATION
      15              :  *      src/test/modules/test_shm_mq/worker.c
      16              :  *
      17              :  * -------------------------------------------------------------------------
      18              :  */
      19              : 
      20              : #include "postgres.h"
      21              : 
      22              : #include "miscadmin.h"
      23              : #include "storage/ipc.h"
      24              : #include "storage/latch.h"
      25              : #include "storage/proc.h"
      26              : #include "storage/procarray.h"
      27              : #include "storage/shm_mq.h"
      28              : #include "storage/shm_toc.h"
      29              : #include "tcop/tcopprot.h"
      30              : 
      31              : #include "test_shm_mq.h"
      32              : 
      33              : static void attach_to_queues(dsm_segment *seg, shm_toc *toc,
      34              :                              int myworkernumber, shm_mq_handle **inqhp,
      35              :                              shm_mq_handle **outqhp);
      36              : static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh);
      37              : 
      38              : /*
      39              :  * Background worker entrypoint.
      40              :  *
      41              :  * This is intended to demonstrate how a background worker can be used to
      42              :  * facilitate a parallel computation.  Most of the logic here is fairly
      43              :  * boilerplate stuff, designed to attach to the shared memory segment,
      44              :  * notify the user backend that we're alive, and so on.  The
      45              :  * application-specific bits of logic that you'd replace for your own worker
      46              :  * are attach_to_queues() and copy_messages().
      47              :  */
      48              : void
      49            7 : test_shm_mq_main(Datum main_arg)
      50              : {
      51              :     dsm_segment *seg;
      52              :     shm_toc    *toc;
      53              :     shm_mq_handle *inqh;
      54              :     shm_mq_handle *outqh;
      55              :     volatile test_shm_mq_header *hdr;
      56              :     int         myworkernumber;
      57              :     PGPROC     *registrant;
      58              : 
      59              :     /* Unblock signals.  The standard signal handlers are OK for us. */
      60            7 :     BackgroundWorkerUnblockSignals();
      61              : 
      62              :     /*
      63              :      * Connect to the dynamic shared memory segment.
      64              :      *
      65              :      * The backend that registered this worker passed us the ID of a shared
      66              :      * memory segment to which we must attach for further instructions.  Once
      67              :      * we've mapped the segment in our address space, attach to the table of
      68              :      * contents so we can locate the various data structures we'll need to
      69              :      * find within the segment.
      70              :      *
      71              :      * Note: at this point, we have not created any ResourceOwner in this
      72              :      * process.  This will result in our DSM mapping surviving until process
      73              :      * exit, which is fine.  If there were a ResourceOwner, it would acquire
      74              :      * ownership of the mapping, but we have no need for that.
      75              :      */
      76            7 :     seg = dsm_attach(DatumGetUInt32(main_arg));
      77            7 :     if (seg == NULL)
      78            0 :         ereport(ERROR,
      79              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      80              :                  errmsg("unable to map dynamic shared memory segment")));
      81            7 :     toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg));
      82            7 :     if (toc == NULL)
      83            0 :         ereport(ERROR,
      84              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      85              :                  errmsg("bad magic number in dynamic shared memory segment")));
      86              : 
      87              :     /*
      88              :      * Acquire a worker number.
      89              :      *
      90              :      * By convention, the process registering this background worker should
      91              :      * have stored the control structure at key 0.  We look up that key to
      92              :      * find it.  Our worker number gives our identity: there may be just one
      93              :      * worker involved in this parallel operation, or there may be many.
      94              :      */
      95            7 :     hdr = shm_toc_lookup(toc, 0, false);
      96            7 :     SpinLockAcquire(&hdr->mutex);
      97            7 :     myworkernumber = ++hdr->workers_attached;
      98            7 :     SpinLockRelease(&hdr->mutex);
      99            7 :     if (myworkernumber > hdr->workers_total)
     100            0 :         ereport(ERROR,
     101              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     102              :                  errmsg("too many message queue testing workers already")));
     103              : 
     104              :     /*
     105              :      * Attach to the appropriate message queues.
     106              :      */
     107            7 :     attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
     108              : 
     109              :     /*
     110              :      * Indicate that we're fully initialized and ready to begin the main part
     111              :      * of the parallel operation.
     112              :      *
     113              :      * Once we signal that we're ready, the user backend is entitled to assume
     114              :      * that our on_dsm_detach callbacks will fire before we disconnect from
     115              :      * the shared memory segment and exit.  Generally, that means we must have
     116              :      * attached to all relevant dynamic shared memory data structures by now.
     117              :      */
     118            7 :     SpinLockAcquire(&hdr->mutex);
     119            7 :     ++hdr->workers_ready;
     120            7 :     SpinLockRelease(&hdr->mutex);
     121            7 :     registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
     122            7 :     if (registrant == NULL)
     123              :     {
     124            0 :         elog(DEBUG1, "registrant backend has exited prematurely");
     125            0 :         proc_exit(1);
     126              :     }
     127            7 :     SetLatch(&registrant->procLatch);
     128              : 
     129              :     /* Do the work. */
     130            7 :     copy_messages(inqh, outqh);
     131              : 
     132              :     /*
     133              :      * We're done.  For cleanliness, explicitly detach from the shared memory
     134              :      * segment (that would happen anyway during process exit, though).
     135              :      */
     136            7 :     dsm_detach(seg);
     137            7 :     proc_exit(1);
     138              : }
     139              : 
     140              : /*
     141              :  * Attach to shared memory message queues.
     142              :  *
     143              :  * We use our worker number to determine to which queue we should attach.
     144              :  * The queues are registered at keys 1..<number-of-workers>.  The user backend
     145              :  * writes to queue #1 and reads from queue #<number-of-workers>; each worker
     146              :  * reads from the queue whose number is equal to its worker number and writes
     147              :  * to the next higher-numbered queue.
     148              :  */
     149              : static void
     150            7 : attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
     151              :                  shm_mq_handle **inqhp, shm_mq_handle **outqhp)
     152              : {
     153              :     shm_mq     *inq;
     154              :     shm_mq     *outq;
     155              : 
     156            7 :     inq = shm_toc_lookup(toc, myworkernumber, false);
     157            7 :     shm_mq_set_receiver(inq, MyProc);
     158            7 :     *inqhp = shm_mq_attach(inq, seg, NULL);
     159            7 :     outq = shm_toc_lookup(toc, myworkernumber + 1, false);
     160            7 :     shm_mq_set_sender(outq, MyProc);
     161            7 :     *outqhp = shm_mq_attach(outq, seg, NULL);
     162            7 : }
     163              : 
     164              : /*
     165              :  * Loop, receiving and sending messages, until the connection is broken.
     166              :  *
     167              :  * This is the "real work" performed by this worker process.  Everything that
     168              :  * happens before this is initialization of one form or another, and everything
     169              :  * after this point is cleanup.
     170              :  */
     171              : static void
     172            7 : copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
     173              : {
     174              :     Size        len;
     175              :     void       *data;
     176              :     shm_mq_result res;
     177              : 
     178              :     for (;;)
     179              :     {
     180              :         /* Notice any interrupts that have occurred. */
     181        24608 :         CHECK_FOR_INTERRUPTS();
     182              : 
     183              :         /* Receive a message. */
     184        24608 :         res = shm_mq_receive(inqh, &len, &data, false);
     185        24608 :         if (res != SHM_MQ_SUCCESS)
     186            7 :             break;
     187              : 
     188              :         /* Send it back out. */
     189        24601 :         res = shm_mq_send(outqh, len, data, false, true);
     190        24601 :         if (res != SHM_MQ_SUCCESS)
     191            0 :             break;
     192              :     }
     193            7 : }
        

Generated by: LCOV version 2.0-1