LCOV - code coverage report
Current view: top level - src/test/modules/test_shm_mq - test.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 68 80 85.0 %
Date: 2025-01-18 04:15:08 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*--------------------------------------------------------------------------
       2             :  *
       3             :  * test.c
       4             :  *      Test harness code for shared memory message queues.
       5             :  *
       6             :  * Copyright (c) 2013-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/test/modules/test_shm_mq/test.c
      10             :  *
      11             :  * -------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres.h"
      15             : 
      16             : #include "fmgr.h"
      17             : #include "miscadmin.h"
      18             : #include "pgstat.h"
      19             : #include "varatt.h"
      20             : 
      21             : #include "test_shm_mq.h"
      22             : 
      23          16 : PG_MODULE_MAGIC;
      24             : 
      25           4 : PG_FUNCTION_INFO_V1(test_shm_mq);
      26           4 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
      27             : 
      28             : static void verify_message(Size origlen, char *origdata, Size newlen,
      29             :                            char *newdata);
      30             : 
      31             : /* value cached, fetched from shared memory */
      32             : static uint32 we_message_queue = 0;
      33             : 
      34             : /*
      35             :  * Simple test of the shared memory message queue infrastructure.
      36             :  *
      37             :  * We set up a ring of message queues passing through 1 or more background
      38             :  * processes and eventually looping back to ourselves.  We then send a message
      39             :  * through the ring a number of times indicated by the loop count.  At the end,
      40             :  * we check whether the final message matches the one we started with.
      41             :  */
      42             : Datum
      43           8 : test_shm_mq(PG_FUNCTION_ARGS)
      44             : {
      45           8 :     int64       queue_size = PG_GETARG_INT64(0);
      46           8 :     text       *message = PG_GETARG_TEXT_PP(1);
      47           8 :     char       *message_contents = VARDATA_ANY(message);
      48           8 :     int         message_size = VARSIZE_ANY_EXHDR(message);
      49           8 :     int32       loop_count = PG_GETARG_INT32(2);
      50           8 :     int32       nworkers = PG_GETARG_INT32(3);
      51             :     dsm_segment *seg;
      52             :     shm_mq_handle *outqh;
      53             :     shm_mq_handle *inqh;
      54             :     shm_mq_result res;
      55             :     Size        len;
      56             :     void       *data;
      57             : 
      58             :     /* A negative loopcount is nonsensical. */
      59           8 :     if (loop_count < 0)
      60           0 :         ereport(ERROR,
      61             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      62             :                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
      63             : 
      64             :     /*
      65             :      * Since this test sends data using the blocking interfaces, it cannot
      66             :      * send data to itself.  Therefore, a minimum of 1 worker is required. Of
      67             :      * course, a negative worker count is nonsensical.
      68             :      */
      69           8 :     if (nworkers <= 0)
      70           0 :         ereport(ERROR,
      71             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      72             :                  errmsg("number of workers must be an integer value greater than zero")));
      73             : 
      74             :     /* Set up dynamic shared memory segment and background workers. */
      75           8 :     test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
      76             : 
      77             :     /* Send the initial message. */
      78           8 :     res = shm_mq_send(outqh, message_size, message_contents, false, true);
      79           8 :     if (res != SHM_MQ_SUCCESS)
      80           0 :         ereport(ERROR,
      81             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      82             :                  errmsg("could not send message")));
      83             : 
      84             :     /*
      85             :      * Receive a message and send it back out again.  Do this a number of
      86             :      * times equal to the loop count.
      87             :      */
      88             :     for (;;)
      89             :     {
      90             :         /* Receive a message. */
      91       48002 :         res = shm_mq_receive(inqh, &len, &data, false);
      92       48002 :         if (res != SHM_MQ_SUCCESS)
      93           0 :             ereport(ERROR,
      94             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      95             :                      errmsg("could not receive message")));
      96             : 
      97             :         /* If this is supposed to be the last iteration, stop here. */
      98       48002 :         if (--loop_count <= 0)
      99           8 :             break;
     100             : 
     101             :         /* Send it back out. */
     102       47994 :         res = shm_mq_send(outqh, len, data, false, true);
     103       47994 :         if (res != SHM_MQ_SUCCESS)
     104           0 :             ereport(ERROR,
     105             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     106             :                      errmsg("could not send message")));
     107             :     }
     108             : 
     109             :     /*
     110             :      * Finally, check that we got back the same message from the last
     111             :      * iteration that we originally sent.
     112             :      */
     113           8 :     verify_message(message_size, message_contents, len, data);
     114             : 
     115             :     /* Clean up. */
     116           8 :     dsm_detach(seg);
     117             : 
     118           8 :     PG_RETURN_VOID();
     119             : }
     120             : 
     121             : /*
     122             :  * Pipelined test of the shared memory message queue infrastructure.
     123             :  *
     124             :  * As in the basic test, we set up a ring of message queues passing through
     125             :  * 1 or more background processes and eventually looping back to ourselves.
     126             :  * Then, we send N copies of the user-specified message through the ring and
     127             :  * receive them all back.  Since this might fill up all message queues in the
     128             :  * ring and then stall, we must be prepared to begin receiving the messages
     129             :  * back before we've finished sending them.
     130             :  */
     131             : Datum
     132           2 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
     133             : {
     134           2 :     int64       queue_size = PG_GETARG_INT64(0);
     135           2 :     text       *message = PG_GETARG_TEXT_PP(1);
     136           2 :     char       *message_contents = VARDATA_ANY(message);
     137           2 :     int         message_size = VARSIZE_ANY_EXHDR(message);
     138           2 :     int32       loop_count = PG_GETARG_INT32(2);
     139           2 :     int32       nworkers = PG_GETARG_INT32(3);
     140           2 :     bool        verify = PG_GETARG_BOOL(4);
     141           2 :     int32       send_count = 0;
     142           2 :     int32       receive_count = 0;
     143             :     dsm_segment *seg;
     144             :     shm_mq_handle *outqh;
     145             :     shm_mq_handle *inqh;
     146             :     shm_mq_result res;
     147             :     Size        len;
     148             :     void       *data;
     149             : 
     150             :     /* A negative loopcount is nonsensical. */
     151           2 :     if (loop_count < 0)
     152           0 :         ereport(ERROR,
     153             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     154             :                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
     155             : 
     156             :     /*
     157             :      * Using the nonblocking interfaces, we can even send data to ourselves,
     158             :      * so the minimum number of workers for this test is zero.
     159             :      */
     160           2 :     if (nworkers < 0)
     161           0 :         ereport(ERROR,
     162             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     163             :                  errmsg("number of workers must be an integer value greater than or equal to zero")));
     164             : 
     165             :     /* Set up dynamic shared memory segment and background workers. */
     166           2 :     test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
     167             : 
     168             :     /* Main loop. */
     169             :     for (;;)
     170       10858 :     {
     171       10860 :         bool        wait = true;
     172             : 
     173             :         /*
     174             :          * If we haven't yet sent the message the requisite number of times,
     175             :          * try again to send it now.  Note that when shm_mq_send() returns
     176             :          * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
     177             :          * same message size and contents; that's not an issue here because
     178             :          * we're sending the same message every time.
     179             :          */
     180       10860 :         if (send_count < loop_count)
     181             :         {
     182       10814 :             res = shm_mq_send(outqh, message_size, message_contents, true,
     183             :                               true);
     184       10814 :             if (res == SHM_MQ_SUCCESS)
     185             :             {
     186         400 :                 ++send_count;
     187         400 :                 wait = false;
     188             :             }
     189       10414 :             else if (res == SHM_MQ_DETACHED)
     190           0 :                 ereport(ERROR,
     191             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     192             :                          errmsg("could not send message")));
     193             :         }
     194             : 
     195             :         /*
     196             :          * If we haven't yet received the message the requisite number of
     197             :          * times, try to receive it again now.
     198             :          */
     199       10860 :         if (receive_count < loop_count)
     200             :         {
     201       10858 :             res = shm_mq_receive(inqh, &len, &data, true);
     202       10858 :             if (res == SHM_MQ_SUCCESS)
     203             :             {
     204         400 :                 ++receive_count;
     205             :                 /* Verifying every time is slow, so it's optional. */
     206         400 :                 if (verify)
     207         400 :                     verify_message(message_size, message_contents, len, data);
     208         400 :                 wait = false;
     209             :             }
     210       10458 :             else if (res == SHM_MQ_DETACHED)
     211           0 :                 ereport(ERROR,
     212             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     213             :                          errmsg("could not receive message")));
     214             :         }
     215             :         else
     216             :         {
     217             :             /*
     218             :              * Otherwise, we've received the message enough times.  This
     219             :              * shouldn't happen unless we've also sent it enough times.
     220             :              */
     221           2 :             if (send_count != receive_count)
     222           0 :                 ereport(ERROR,
     223             :                         (errcode(ERRCODE_INTERNAL_ERROR),
     224             :                          errmsg("message sent %d times, but received %d times",
     225             :                                 send_count, receive_count)));
     226           2 :             break;
     227             :         }
     228             : 
     229       10858 :         if (wait)
     230             :         {
     231             :             /* first time, allocate or get the custom wait event */
     232       10200 :             if (we_message_queue == 0)
     233           2 :                 we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
     234             : 
     235             :             /*
     236             :              * If we made no progress, wait for one of the other processes to
     237             :              * which we are connected to set our latch, indicating that they
     238             :              * have read or written data and therefore there may now be work
     239             :              * for us to do.
     240             :              */
     241       10200 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     242             :                              we_message_queue);
     243       10200 :             ResetLatch(MyLatch);
     244       10200 :             CHECK_FOR_INTERRUPTS();
     245             :         }
     246             :     }
     247             : 
     248             :     /* Clean up. */
     249           2 :     dsm_detach(seg);
     250             : 
     251           2 :     PG_RETURN_VOID();
     252             : }
     253             : 
     254             : /*
     255             :  * Verify that two messages are the same.
     256             :  */
     257             : static void
     258         408 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
     259             : {
     260             :     Size        i;
     261             : 
     262         408 :     if (origlen != newlen)
     263           0 :         ereport(ERROR,
     264             :                 (errmsg("message corrupted"),
     265             :                  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
     266             :                            origlen, newlen)));
     267             : 
     268   108002344 :     for (i = 0; i < origlen; ++i)
     269   108001936 :         if (origdata[i] != newdata[i])
     270           0 :             ereport(ERROR,
     271             :                     (errmsg("message corrupted"),
     272             :                      errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
     273         408 : }

Generated by: LCOV version 1.14