LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - test.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 85.0 % 80 68
Test Date: 2026-03-14 21:14:51 Functions: 100.0 % 6 6
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*--------------------------------------------------------------------------
       2              :  *
       3              :  * test.c
       4              :  *      Test harness code for shared memory message queues.
       5              :  *
       6              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *      src/test/modules/test_shm_mq/test.c
      10              :  *
      11              :  * -------------------------------------------------------------------------
      12              :  */
      13              : 
      14              : #include "postgres.h"
      15              : 
      16              : #include "fmgr.h"
      17              : #include "miscadmin.h"
      18              : #include "pgstat.h"
      19              : #include "storage/proc.h"
      20              : #include "utils/wait_event.h"
      21              : #include "varatt.h"
      22              : 
      23              : #include "test_shm_mq.h"
      24              : 
      25            8 : PG_MODULE_MAGIC;
      26              : 
      27            2 : PG_FUNCTION_INFO_V1(test_shm_mq);
      28            2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
      29              : 
      30              : static void verify_message(Size origlen, char *origdata, Size newlen,
      31              :                            char *newdata);
      32              : 
      33              : /* value cached, fetched from shared memory */
      34              : static uint32 we_message_queue = 0;
      35              : 
      36              : /*
      37              :  * Simple test of the shared memory message queue infrastructure.
      38              :  *
      39              :  * We set up a ring of message queues passing through 1 or more background
      40              :  * processes and eventually looping back to ourselves.  We then send a message
      41              :  * through the ring a number of times indicated by the loop count.  At the end,
      42              :  * we check whether the final message matches the one we started with.
      43              :  */
      44              : Datum
      45            4 : test_shm_mq(PG_FUNCTION_ARGS)
      46              : {
      47            4 :     int64       queue_size = PG_GETARG_INT64(0);
      48            4 :     text       *message = PG_GETARG_TEXT_PP(1);
      49            4 :     char       *message_contents = VARDATA_ANY(message);
      50            4 :     int         message_size = VARSIZE_ANY_EXHDR(message);
      51            4 :     int32       loop_count = PG_GETARG_INT32(2);
      52            4 :     int32       nworkers = PG_GETARG_INT32(3);
      53              :     dsm_segment *seg;
      54              :     shm_mq_handle *outqh;
      55              :     shm_mq_handle *inqh;
      56              :     shm_mq_result res;
      57              :     Size        len;
      58              :     void       *data;
      59              : 
      60              :     /* A negative loopcount is nonsensical. */
      61            4 :     if (loop_count < 0)
      62            0 :         ereport(ERROR,
      63              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      64              :                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
      65              : 
      66              :     /*
      67              :      * Since this test sends data using the blocking interfaces, it cannot
      68              :      * send data to itself.  Therefore, a minimum of 1 worker is required. Of
      69              :      * course, a negative worker count is nonsensical.
      70              :      */
      71            4 :     if (nworkers <= 0)
      72            0 :         ereport(ERROR,
      73              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      74              :                  errmsg("number of workers must be an integer value greater than zero")));
      75              : 
      76              :     /* Set up dynamic shared memory segment and background workers. */
      77            4 :     test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
      78              : 
      79              :     /* Send the initial message. */
      80            4 :     res = shm_mq_send(outqh, message_size, message_contents, false, true);
      81            4 :     if (res != SHM_MQ_SUCCESS)
      82            0 :         ereport(ERROR,
      83              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      84              :                  errmsg("could not send message")));
      85              : 
      86              :     /*
      87              :      * Receive a message and send it back out again.  Do this a number of
      88              :      * times equal to the loop count.
      89              :      */
      90              :     for (;;)
      91              :     {
      92              :         /* Receive a message. */
      93        24001 :         res = shm_mq_receive(inqh, &len, &data, false);
      94        24001 :         if (res != SHM_MQ_SUCCESS)
      95            0 :             ereport(ERROR,
      96              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      97              :                      errmsg("could not receive message")));
      98              : 
      99              :         /* If this is supposed to be the last iteration, stop here. */
     100        24001 :         if (--loop_count <= 0)
     101            4 :             break;
     102              : 
     103              :         /* Send it back out. */
     104        23997 :         res = shm_mq_send(outqh, len, data, false, true);
     105        23997 :         if (res != SHM_MQ_SUCCESS)
     106            0 :             ereport(ERROR,
     107              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     108              :                      errmsg("could not send message")));
     109              :     }
     110              : 
     111              :     /*
     112              :      * Finally, check that we got back the same message from the last
     113              :      * iteration that we originally sent.
     114              :      */
     115            4 :     verify_message(message_size, message_contents, len, data);
     116              : 
     117              :     /* Clean up. */
     118            4 :     dsm_detach(seg);
     119              : 
     120            4 :     PG_RETURN_VOID();
     121              : }
     122              : 
     123              : /*
     124              :  * Pipelined test of the shared memory message queue infrastructure.
     125              :  *
     126              :  * As in the basic test, we set up a ring of message queues passing through
     127              :  * 1 or more background processes and eventually looping back to ourselves.
     128              :  * Then, we send N copies of the user-specified message through the ring and
     129              :  * receive them all back.  Since this might fill up all message queues in the
     130              :  * ring and then stall, we must be prepared to begin receiving the messages
     131              :  * back before we've finished sending them.
     132              :  */
     133              : Datum
     134            1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
     135              : {
     136            1 :     int64       queue_size = PG_GETARG_INT64(0);
     137            1 :     text       *message = PG_GETARG_TEXT_PP(1);
     138            1 :     char       *message_contents = VARDATA_ANY(message);
     139            1 :     int         message_size = VARSIZE_ANY_EXHDR(message);
     140            1 :     int32       loop_count = PG_GETARG_INT32(2);
     141            1 :     int32       nworkers = PG_GETARG_INT32(3);
     142            1 :     bool        verify = PG_GETARG_BOOL(4);
     143            1 :     int32       send_count = 0;
     144            1 :     int32       receive_count = 0;
     145              :     dsm_segment *seg;
     146              :     shm_mq_handle *outqh;
     147              :     shm_mq_handle *inqh;
     148              :     shm_mq_result res;
     149              :     Size        len;
     150              :     void       *data;
     151              : 
     152              :     /* A negative loopcount is nonsensical. */
     153            1 :     if (loop_count < 0)
     154            0 :         ereport(ERROR,
     155              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     156              :                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
     157              : 
     158              :     /*
     159              :      * Using the nonblocking interfaces, we can even send data to ourselves,
     160              :      * so the minimum number of workers for this test is zero.
     161              :      */
     162            1 :     if (nworkers < 0)
     163            0 :         ereport(ERROR,
     164              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     165              :                  errmsg("number of workers must be an integer value greater than or equal to zero")));
     166              : 
     167              :     /* Set up dynamic shared memory segment and background workers. */
     168            1 :     test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
     169              : 
     170              :     /* Main loop. */
     171              :     for (;;)
     172         6560 :     {
     173         6561 :         bool        wait = true;
     174              : 
     175              :         /*
     176              :          * If we haven't yet sent the message the requisite number of times,
     177              :          * try again to send it now.  Note that when shm_mq_send() returns
     178              :          * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
     179              :          * same message size and contents; that's not an issue here because
     180              :          * we're sending the same message every time.
     181              :          */
     182         6561 :         if (send_count < loop_count)
     183              :         {
     184         6537 :             res = shm_mq_send(outqh, message_size, message_contents, true,
     185              :                               true);
     186         6537 :             if (res == SHM_MQ_SUCCESS)
     187              :             {
     188          200 :                 ++send_count;
     189          200 :                 wait = false;
     190              :             }
     191         6337 :             else if (res == SHM_MQ_DETACHED)
     192            0 :                 ereport(ERROR,
     193              :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     194              :                          errmsg("could not send message")));
     195              :         }
     196              : 
     197              :         /*
     198              :          * If we haven't yet received the message the requisite number of
     199              :          * times, try to receive it again now.
     200              :          */
     201         6561 :         if (receive_count < loop_count)
     202              :         {
     203         6560 :             res = shm_mq_receive(inqh, &len, &data, true);
     204         6560 :             if (res == SHM_MQ_SUCCESS)
     205              :             {
     206          200 :                 ++receive_count;
     207              :                 /* Verifying every time is slow, so it's optional. */
     208          200 :                 if (verify)
     209          200 :                     verify_message(message_size, message_contents, len, data);
     210          200 :                 wait = false;
     211              :             }
     212         6360 :             else if (res == SHM_MQ_DETACHED)
     213            0 :                 ereport(ERROR,
     214              :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     215              :                          errmsg("could not receive message")));
     216              :         }
     217              :         else
     218              :         {
     219              :             /*
     220              :              * Otherwise, we've received the message enough times.  This
     221              :              * shouldn't happen unless we've also sent it enough times.
     222              :              */
     223            1 :             if (send_count != receive_count)
     224            0 :                 ereport(ERROR,
     225              :                         (errcode(ERRCODE_INTERNAL_ERROR),
     226              :                          errmsg("message sent %d times, but received %d times",
     227              :                                 send_count, receive_count)));
     228            1 :             break;
     229              :         }
     230              : 
     231         6560 :         if (wait)
     232              :         {
     233              :             /* first time, allocate or get the custom wait event */
     234         6169 :             if (we_message_queue == 0)
     235            1 :                 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
     236              : 
     237              :             /*
     238              :              * If we made no progress, wait for one of the other processes to
     239              :              * which we are connected to set our latch, indicating that they
     240              :              * have read or written data and therefore there may now be work
     241              :              * for us to do.
     242              :              */
     243         6169 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     244              :                              we_message_queue);
     245         6169 :             ResetLatch(MyLatch);
     246         6169 :             CHECK_FOR_INTERRUPTS();
     247              :         }
     248              :     }
     249              : 
     250              :     /* Clean up. */
     251            1 :     dsm_detach(seg);
     252              : 
     253            1 :     PG_RETURN_VOID();
     254              : }
     255              : 
     256              : /*
     257              :  * Verify that two messages are the same.
     258              :  */
     259              : static void
     260          204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
     261              : {
     262              :     Size        i;
     263              : 
     264          204 :     if (origlen != newlen)
     265            0 :         ereport(ERROR,
     266              :                 (errmsg("message corrupted"),
     267              :                  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
     268              :                            origlen, newlen)));
     269              : 
     270     54001081 :     for (i = 0; i < origlen; ++i)
     271     54000877 :         if (origdata[i] != newdata[i])
     272            0 :             ereport(ERROR,
     273              :                     (errmsg("message corrupted"),
     274              :                      errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
     275          204 : }
        

Generated by: LCOV version 2.0-1