Line data Source code
1 : /*--------------------------------------------------------------------------
2 : *
3 : * test.c
4 : * Test harness code for shared memory message queues.
5 : *
6 : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/test/modules/test_shm_mq/test.c
10 : *
11 : * -------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "fmgr.h"
17 : #include "miscadmin.h"
18 : #include "pgstat.h"
19 : #include "storage/proc.h"
20 : #include "utils/wait_event.h"
21 : #include "varatt.h"
22 :
23 : #include "test_shm_mq.h"
24 :
25 8 : PG_MODULE_MAGIC;
26 :
27 2 : PG_FUNCTION_INFO_V1(test_shm_mq);
28 2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
29 :
30 : static void verify_message(Size origlen, char *origdata, Size newlen,
31 : char *newdata);
32 :
33 : /* value cached, fetched from shared memory */
34 : static uint32 we_message_queue = 0;
35 :
36 : /*
37 : * Simple test of the shared memory message queue infrastructure.
38 : *
39 : * We set up a ring of message queues passing through 1 or more background
40 : * processes and eventually looping back to ourselves. We then send a message
41 : * through the ring a number of times indicated by the loop count. At the end,
42 : * we check whether the final message matches the one we started with.
43 : */
44 : Datum
45 4 : test_shm_mq(PG_FUNCTION_ARGS)
46 : {
47 4 : int64 queue_size = PG_GETARG_INT64(0);
48 4 : text *message = PG_GETARG_TEXT_PP(1);
49 4 : char *message_contents = VARDATA_ANY(message);
50 4 : int message_size = VARSIZE_ANY_EXHDR(message);
51 4 : int32 loop_count = PG_GETARG_INT32(2);
52 4 : int32 nworkers = PG_GETARG_INT32(3);
53 : dsm_segment *seg;
54 : shm_mq_handle *outqh;
55 : shm_mq_handle *inqh;
56 : shm_mq_result res;
57 : Size len;
58 : void *data;
59 :
60 : /* A negative loopcount is nonsensical. */
61 4 : if (loop_count < 0)
62 0 : ereport(ERROR,
63 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
64 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
65 :
66 : /*
67 : * Since this test sends data using the blocking interfaces, it cannot
68 : * send data to itself. Therefore, a minimum of 1 worker is required. Of
69 : * course, a negative worker count is nonsensical.
70 : */
71 4 : if (nworkers <= 0)
72 0 : ereport(ERROR,
73 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : errmsg("number of workers must be an integer value greater than zero")));
75 :
76 : /* Set up dynamic shared memory segment and background workers. */
77 4 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
78 :
79 : /* Send the initial message. */
80 4 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
81 4 : if (res != SHM_MQ_SUCCESS)
82 0 : ereport(ERROR,
83 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
84 : errmsg("could not send message")));
85 :
86 : /*
87 : * Receive a message and send it back out again. Do this a number of
88 : * times equal to the loop count.
89 : */
90 : for (;;)
91 : {
92 : /* Receive a message. */
93 24001 : res = shm_mq_receive(inqh, &len, &data, false);
94 24001 : if (res != SHM_MQ_SUCCESS)
95 0 : ereport(ERROR,
96 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
97 : errmsg("could not receive message")));
98 :
99 : /* If this is supposed to be the last iteration, stop here. */
100 24001 : if (--loop_count <= 0)
101 4 : break;
102 :
103 : /* Send it back out. */
104 23997 : res = shm_mq_send(outqh, len, data, false, true);
105 23997 : if (res != SHM_MQ_SUCCESS)
106 0 : ereport(ERROR,
107 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
108 : errmsg("could not send message")));
109 : }
110 :
111 : /*
112 : * Finally, check that we got back the same message from the last
113 : * iteration that we originally sent.
114 : */
115 4 : verify_message(message_size, message_contents, len, data);
116 :
117 : /* Clean up. */
118 4 : dsm_detach(seg);
119 :
120 4 : PG_RETURN_VOID();
121 : }
122 :
123 : /*
124 : * Pipelined test of the shared memory message queue infrastructure.
125 : *
126 : * As in the basic test, we set up a ring of message queues passing through
127 : * 1 or more background processes and eventually looping back to ourselves.
128 : * Then, we send N copies of the user-specified message through the ring and
129 : * receive them all back. Since this might fill up all message queues in the
130 : * ring and then stall, we must be prepared to begin receiving the messages
131 : * back before we've finished sending them.
132 : */
133 : Datum
134 1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
135 : {
136 1 : int64 queue_size = PG_GETARG_INT64(0);
137 1 : text *message = PG_GETARG_TEXT_PP(1);
138 1 : char *message_contents = VARDATA_ANY(message);
139 1 : int message_size = VARSIZE_ANY_EXHDR(message);
140 1 : int32 loop_count = PG_GETARG_INT32(2);
141 1 : int32 nworkers = PG_GETARG_INT32(3);
142 1 : bool verify = PG_GETARG_BOOL(4);
143 1 : int32 send_count = 0;
144 1 : int32 receive_count = 0;
145 : dsm_segment *seg;
146 : shm_mq_handle *outqh;
147 : shm_mq_handle *inqh;
148 : shm_mq_result res;
149 : Size len;
150 : void *data;
151 :
152 : /* A negative loopcount is nonsensical. */
153 1 : if (loop_count < 0)
154 0 : ereport(ERROR,
155 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
156 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
157 :
158 : /*
159 : * Using the nonblocking interfaces, we can even send data to ourselves,
160 : * so the minimum number of workers for this test is zero.
161 : */
162 1 : if (nworkers < 0)
163 0 : ereport(ERROR,
164 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165 : errmsg("number of workers must be an integer value greater than or equal to zero")));
166 :
167 : /* Set up dynamic shared memory segment and background workers. */
168 1 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
169 :
170 : /* Main loop. */
171 : for (;;)
172 6560 : {
173 6561 : bool wait = true;
174 :
175 : /*
176 : * If we haven't yet sent the message the requisite number of times,
177 : * try again to send it now. Note that when shm_mq_send() returns
178 : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
179 : * same message size and contents; that's not an issue here because
180 : * we're sending the same message every time.
181 : */
182 6561 : if (send_count < loop_count)
183 : {
184 6537 : res = shm_mq_send(outqh, message_size, message_contents, true,
185 : true);
186 6537 : if (res == SHM_MQ_SUCCESS)
187 : {
188 200 : ++send_count;
189 200 : wait = false;
190 : }
191 6337 : else if (res == SHM_MQ_DETACHED)
192 0 : ereport(ERROR,
193 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
194 : errmsg("could not send message")));
195 : }
196 :
197 : /*
198 : * If we haven't yet received the message the requisite number of
199 : * times, try to receive it again now.
200 : */
201 6561 : if (receive_count < loop_count)
202 : {
203 6560 : res = shm_mq_receive(inqh, &len, &data, true);
204 6560 : if (res == SHM_MQ_SUCCESS)
205 : {
206 200 : ++receive_count;
207 : /* Verifying every time is slow, so it's optional. */
208 200 : if (verify)
209 200 : verify_message(message_size, message_contents, len, data);
210 200 : wait = false;
211 : }
212 6360 : else if (res == SHM_MQ_DETACHED)
213 0 : ereport(ERROR,
214 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
215 : errmsg("could not receive message")));
216 : }
217 : else
218 : {
219 : /*
220 : * Otherwise, we've received the message enough times. This
221 : * shouldn't happen unless we've also sent it enough times.
222 : */
223 1 : if (send_count != receive_count)
224 0 : ereport(ERROR,
225 : (errcode(ERRCODE_INTERNAL_ERROR),
226 : errmsg("message sent %d times, but received %d times",
227 : send_count, receive_count)));
228 1 : break;
229 : }
230 :
231 6560 : if (wait)
232 : {
233 : /* first time, allocate or get the custom wait event */
234 6169 : if (we_message_queue == 0)
235 1 : we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
236 :
237 : /*
238 : * If we made no progress, wait for one of the other processes to
239 : * which we are connected to set our latch, indicating that they
240 : * have read or written data and therefore there may now be work
241 : * for us to do.
242 : */
243 6169 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
244 : we_message_queue);
245 6169 : ResetLatch(MyLatch);
246 6169 : CHECK_FOR_INTERRUPTS();
247 : }
248 : }
249 :
250 : /* Clean up. */
251 1 : dsm_detach(seg);
252 :
253 1 : PG_RETURN_VOID();
254 : }
255 :
256 : /*
257 : * Verify that two messages are the same.
258 : */
259 : static void
260 204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
261 : {
262 : Size i;
263 :
264 204 : if (origlen != newlen)
265 0 : ereport(ERROR,
266 : (errmsg("message corrupted"),
267 : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
268 : origlen, newlen)));
269 :
270 54001081 : for (i = 0; i < origlen; ++i)
271 54000877 : if (origdata[i] != newdata[i])
272 0 : ereport(ERROR,
273 : (errmsg("message corrupted"),
274 : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
275 204 : }
|