Line data Source code
1 : /*--------------------------------------------------------------------------
2 : *
3 : * test.c
4 : * Test harness code for shared memory message queues.
5 : *
6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/test/modules/test_shm_mq/test.c
10 : *
11 : * -------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "fmgr.h"
17 : #include "miscadmin.h"
18 : #include "pgstat.h"
19 : #include "varatt.h"
20 :
21 : #include "test_shm_mq.h"
22 :
23 16 : PG_MODULE_MAGIC;
24 :
25 4 : PG_FUNCTION_INFO_V1(test_shm_mq);
26 4 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
27 :
28 : static void verify_message(Size origlen, char *origdata, Size newlen,
29 : char *newdata);
30 :
31 : /* value cached, fetched from shared memory */
32 : static uint32 we_message_queue = 0;
33 :
34 : /*
35 : * Simple test of the shared memory message queue infrastructure.
36 : *
37 : * We set up a ring of message queues passing through 1 or more background
38 : * processes and eventually looping back to ourselves. We then send a message
39 : * through the ring a number of times indicated by the loop count. At the end,
40 : * we check whether the final message matches the one we started with.
41 : */
42 : Datum
43 8 : test_shm_mq(PG_FUNCTION_ARGS)
44 : {
45 8 : int64 queue_size = PG_GETARG_INT64(0);
46 8 : text *message = PG_GETARG_TEXT_PP(1);
47 8 : char *message_contents = VARDATA_ANY(message);
48 8 : int message_size = VARSIZE_ANY_EXHDR(message);
49 8 : int32 loop_count = PG_GETARG_INT32(2);
50 8 : int32 nworkers = PG_GETARG_INT32(3);
51 : dsm_segment *seg;
52 : shm_mq_handle *outqh;
53 : shm_mq_handle *inqh;
54 : shm_mq_result res;
55 : Size len;
56 : void *data;
57 :
58 : /* A negative loopcount is nonsensical. */
59 8 : if (loop_count < 0)
60 0 : ereport(ERROR,
61 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
63 :
64 : /*
65 : * Since this test sends data using the blocking interfaces, it cannot
66 : * send data to itself. Therefore, a minimum of 1 worker is required. Of
67 : * course, a negative worker count is nonsensical.
68 : */
69 8 : if (nworkers <= 0)
70 0 : ereport(ERROR,
71 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
72 : errmsg("number of workers must be an integer value greater than zero")));
73 :
74 : /* Set up dynamic shared memory segment and background workers. */
75 8 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
76 :
77 : /* Send the initial message. */
78 8 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
79 8 : if (res != SHM_MQ_SUCCESS)
80 0 : ereport(ERROR,
81 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
82 : errmsg("could not send message")));
83 :
84 : /*
85 : * Receive a message and send it back out again. Do this a number of
86 : * times equal to the loop count.
87 : */
88 : for (;;)
89 : {
90 : /* Receive a message. */
91 48002 : res = shm_mq_receive(inqh, &len, &data, false);
92 48002 : if (res != SHM_MQ_SUCCESS)
93 0 : ereport(ERROR,
94 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
95 : errmsg("could not receive message")));
96 :
97 : /* If this is supposed to be the last iteration, stop here. */
98 48002 : if (--loop_count <= 0)
99 8 : break;
100 :
101 : /* Send it back out. */
102 47994 : res = shm_mq_send(outqh, len, data, false, true);
103 47994 : if (res != SHM_MQ_SUCCESS)
104 0 : ereport(ERROR,
105 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
106 : errmsg("could not send message")));
107 : }
108 :
109 : /*
110 : * Finally, check that we got back the same message from the last
111 : * iteration that we originally sent.
112 : */
113 8 : verify_message(message_size, message_contents, len, data);
114 :
115 : /* Clean up. */
116 8 : dsm_detach(seg);
117 :
118 8 : PG_RETURN_VOID();
119 : }
120 :
121 : /*
122 : * Pipelined test of the shared memory message queue infrastructure.
123 : *
124 : * As in the basic test, we set up a ring of message queues passing through
125 : * 1 or more background processes and eventually looping back to ourselves.
126 : * Then, we send N copies of the user-specified message through the ring and
127 : * receive them all back. Since this might fill up all message queues in the
128 : * ring and then stall, we must be prepared to begin receiving the messages
129 : * back before we've finished sending them.
130 : */
131 : Datum
132 2 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
133 : {
134 2 : int64 queue_size = PG_GETARG_INT64(0);
135 2 : text *message = PG_GETARG_TEXT_PP(1);
136 2 : char *message_contents = VARDATA_ANY(message);
137 2 : int message_size = VARSIZE_ANY_EXHDR(message);
138 2 : int32 loop_count = PG_GETARG_INT32(2);
139 2 : int32 nworkers = PG_GETARG_INT32(3);
140 2 : bool verify = PG_GETARG_BOOL(4);
141 2 : int32 send_count = 0;
142 2 : int32 receive_count = 0;
143 : dsm_segment *seg;
144 : shm_mq_handle *outqh;
145 : shm_mq_handle *inqh;
146 : shm_mq_result res;
147 : Size len;
148 : void *data;
149 :
150 : /* A negative loopcount is nonsensical. */
151 2 : if (loop_count < 0)
152 0 : ereport(ERROR,
153 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
155 :
156 : /*
157 : * Using the nonblocking interfaces, we can even send data to ourselves,
158 : * so the minimum number of workers for this test is zero.
159 : */
160 2 : if (nworkers < 0)
161 0 : ereport(ERROR,
162 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163 : errmsg("number of workers must be an integer value greater than or equal to zero")));
164 :
165 : /* Set up dynamic shared memory segment and background workers. */
166 2 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
167 :
168 : /* Main loop. */
169 : for (;;)
170 10858 : {
171 10860 : bool wait = true;
172 :
173 : /*
174 : * If we haven't yet sent the message the requisite number of times,
175 : * try again to send it now. Note that when shm_mq_send() returns
176 : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
177 : * same message size and contents; that's not an issue here because
178 : * we're sending the same message every time.
179 : */
180 10860 : if (send_count < loop_count)
181 : {
182 10814 : res = shm_mq_send(outqh, message_size, message_contents, true,
183 : true);
184 10814 : if (res == SHM_MQ_SUCCESS)
185 : {
186 400 : ++send_count;
187 400 : wait = false;
188 : }
189 10414 : else if (res == SHM_MQ_DETACHED)
190 0 : ereport(ERROR,
191 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 : errmsg("could not send message")));
193 : }
194 :
195 : /*
196 : * If we haven't yet received the message the requisite number of
197 : * times, try to receive it again now.
198 : */
199 10860 : if (receive_count < loop_count)
200 : {
201 10858 : res = shm_mq_receive(inqh, &len, &data, true);
202 10858 : if (res == SHM_MQ_SUCCESS)
203 : {
204 400 : ++receive_count;
205 : /* Verifying every time is slow, so it's optional. */
206 400 : if (verify)
207 400 : verify_message(message_size, message_contents, len, data);
208 400 : wait = false;
209 : }
210 10458 : else if (res == SHM_MQ_DETACHED)
211 0 : ereport(ERROR,
212 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213 : errmsg("could not receive message")));
214 : }
215 : else
216 : {
217 : /*
218 : * Otherwise, we've received the message enough times. This
219 : * shouldn't happen unless we've also sent it enough times.
220 : */
221 2 : if (send_count != receive_count)
222 0 : ereport(ERROR,
223 : (errcode(ERRCODE_INTERNAL_ERROR),
224 : errmsg("message sent %d times, but received %d times",
225 : send_count, receive_count)));
226 2 : break;
227 : }
228 :
229 10858 : if (wait)
230 : {
231 : /* first time, allocate or get the custom wait event */
232 10200 : if (we_message_queue == 0)
233 2 : we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
234 :
235 : /*
236 : * If we made no progress, wait for one of the other processes to
237 : * which we are connected to set our latch, indicating that they
238 : * have read or written data and therefore there may now be work
239 : * for us to do.
240 : */
241 10200 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
242 : we_message_queue);
243 10200 : ResetLatch(MyLatch);
244 10200 : CHECK_FOR_INTERRUPTS();
245 : }
246 : }
247 :
248 : /* Clean up. */
249 2 : dsm_detach(seg);
250 :
251 2 : PG_RETURN_VOID();
252 : }
253 :
254 : /*
255 : * Verify that two messages are the same.
256 : */
257 : static void
258 408 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
259 : {
260 : Size i;
261 :
262 408 : if (origlen != newlen)
263 0 : ereport(ERROR,
264 : (errmsg("message corrupted"),
265 : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
266 : origlen, newlen)));
267 :
268 108002344 : for (i = 0; i < origlen; ++i)
269 108001936 : if (origdata[i] != newdata[i])
270 0 : ereport(ERROR,
271 : (errmsg("message corrupted"),
272 : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
273 408 : }
|