Line data Source code
1 : /*--------------------------------------------------------------------------
2 : *
3 : * test.c
4 : * Test harness code for shared memory message queues.
5 : *
6 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/test/modules/test_shm_mq/test.c
10 : *
11 : * -------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "fmgr.h"
17 : #include "miscadmin.h"
18 : #include "pgstat.h"
19 : #include "varatt.h"
20 :
21 : #include "test_shm_mq.h"
22 :
23 16 : PG_MODULE_MAGIC;
24 :
25 4 : PG_FUNCTION_INFO_V1(test_shm_mq);
26 4 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
27 :
28 : static void verify_message(Size origlen, char *origdata, Size newlen,
29 : char *newdata);
30 :
31 : /*
32 : * Simple test of the shared memory message queue infrastructure.
33 : *
34 : * We set up a ring of message queues passing through 1 or more background
35 : * processes and eventually looping back to ourselves. We then send a message
36 : * through the ring a number of times indicated by the loop count. At the end,
37 : * we check whether the final message matches the one we started with.
38 : */
39 : Datum
40 8 : test_shm_mq(PG_FUNCTION_ARGS)
41 : {
42 8 : int64 queue_size = PG_GETARG_INT64(0);
43 8 : text *message = PG_GETARG_TEXT_PP(1);
44 8 : char *message_contents = VARDATA_ANY(message);
45 8 : int message_size = VARSIZE_ANY_EXHDR(message);
46 8 : int32 loop_count = PG_GETARG_INT32(2);
47 8 : int32 nworkers = PG_GETARG_INT32(3);
48 : dsm_segment *seg;
49 : shm_mq_handle *outqh;
50 : shm_mq_handle *inqh;
51 : shm_mq_result res;
52 : Size len;
53 : void *data;
54 :
55 : /* A negative loopcount is nonsensical. */
56 8 : if (loop_count < 0)
57 0 : ereport(ERROR,
58 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
59 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
60 :
61 : /*
62 : * Since this test sends data using the blocking interfaces, it cannot
63 : * send data to itself. Therefore, a minimum of 1 worker is required. Of
64 : * course, a negative worker count is nonsensical.
65 : */
66 8 : if (nworkers <= 0)
67 0 : ereport(ERROR,
68 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
69 : errmsg("number of workers must be an integer value greater than zero")));
70 :
71 : /* Set up dynamic shared memory segment and background workers. */
72 8 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
73 :
74 : /* Send the initial message. */
75 8 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
76 8 : if (res != SHM_MQ_SUCCESS)
77 0 : ereport(ERROR,
78 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
79 : errmsg("could not send message")));
80 :
81 : /*
82 : * Receive a message and send it back out again. Do this a number of
83 : * times equal to the loop count.
84 : */
85 : for (;;)
86 : {
87 : /* Receive a message. */
88 48002 : res = shm_mq_receive(inqh, &len, &data, false);
89 48002 : if (res != SHM_MQ_SUCCESS)
90 0 : ereport(ERROR,
91 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
92 : errmsg("could not receive message")));
93 :
94 : /* If this is supposed to be the last iteration, stop here. */
95 48002 : if (--loop_count <= 0)
96 8 : break;
97 :
98 : /* Send it back out. */
99 47994 : res = shm_mq_send(outqh, len, data, false, true);
100 47994 : if (res != SHM_MQ_SUCCESS)
101 0 : ereport(ERROR,
102 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
103 : errmsg("could not send message")));
104 : }
105 :
106 : /*
107 : * Finally, check that we got back the same message from the last
108 : * iteration that we originally sent.
109 : */
110 8 : verify_message(message_size, message_contents, len, data);
111 :
112 : /* Clean up. */
113 8 : dsm_detach(seg);
114 :
115 8 : PG_RETURN_VOID();
116 : }
117 :
118 : /*
119 : * Pipelined test of the shared memory message queue infrastructure.
120 : *
121 : * As in the basic test, we set up a ring of message queues passing through
122 : * 1 or more background processes and eventually looping back to ourselves.
123 : * Then, we send N copies of the user-specified message through the ring and
124 : * receive them all back. Since this might fill up all message queues in the
125 : * ring and then stall, we must be prepared to begin receiving the messages
126 : * back before we've finished sending them.
127 : */
128 : Datum
129 2 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
130 : {
131 2 : int64 queue_size = PG_GETARG_INT64(0);
132 2 : text *message = PG_GETARG_TEXT_PP(1);
133 2 : char *message_contents = VARDATA_ANY(message);
134 2 : int message_size = VARSIZE_ANY_EXHDR(message);
135 2 : int32 loop_count = PG_GETARG_INT32(2);
136 2 : int32 nworkers = PG_GETARG_INT32(3);
137 2 : bool verify = PG_GETARG_BOOL(4);
138 2 : int32 send_count = 0;
139 2 : int32 receive_count = 0;
140 : dsm_segment *seg;
141 : shm_mq_handle *outqh;
142 : shm_mq_handle *inqh;
143 : shm_mq_result res;
144 : Size len;
145 : void *data;
146 :
147 : /* A negative loopcount is nonsensical. */
148 2 : if (loop_count < 0)
149 0 : ereport(ERROR,
150 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
152 :
153 : /*
154 : * Using the nonblocking interfaces, we can even send data to ourselves,
155 : * so the minimum number of workers for this test is zero.
156 : */
157 2 : if (nworkers < 0)
158 0 : ereport(ERROR,
159 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
160 : errmsg("number of workers must be an integer value greater than or equal to zero")));
161 :
162 : /* Set up dynamic shared memory segment and background workers. */
163 2 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
164 :
165 : /* Main loop. */
166 : for (;;)
167 9186 : {
168 9188 : bool wait = true;
169 :
170 : /*
171 : * If we haven't yet sent the message the requisite number of times,
172 : * try again to send it now. Note that when shm_mq_send() returns
173 : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
174 : * same message size and contents; that's not an issue here because
175 : * we're sending the same message every time.
176 : */
177 9188 : if (send_count < loop_count)
178 : {
179 9112 : res = shm_mq_send(outqh, message_size, message_contents, true,
180 : true);
181 9112 : if (res == SHM_MQ_SUCCESS)
182 : {
183 400 : ++send_count;
184 400 : wait = false;
185 : }
186 8712 : else if (res == SHM_MQ_DETACHED)
187 0 : ereport(ERROR,
188 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 : errmsg("could not send message")));
190 : }
191 :
192 : /*
193 : * If we haven't yet received the message the requisite number of
194 : * times, try to receive it again now.
195 : */
196 9188 : if (receive_count < loop_count)
197 : {
198 9186 : res = shm_mq_receive(inqh, &len, &data, true);
199 9186 : if (res == SHM_MQ_SUCCESS)
200 : {
201 400 : ++receive_count;
202 : /* Verifying every time is slow, so it's optional. */
203 400 : if (verify)
204 400 : verify_message(message_size, message_contents, len, data);
205 400 : wait = false;
206 : }
207 8786 : else if (res == SHM_MQ_DETACHED)
208 0 : ereport(ERROR,
209 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 : errmsg("could not receive message")));
211 : }
212 : else
213 : {
214 : /*
215 : * Otherwise, we've received the message enough times. This
216 : * shouldn't happen unless we've also sent it enough times.
217 : */
218 2 : if (send_count != receive_count)
219 0 : ereport(ERROR,
220 : (errcode(ERRCODE_INTERNAL_ERROR),
221 : errmsg("message sent %d times, but received %d times",
222 : send_count, receive_count)));
223 2 : break;
224 : }
225 :
226 9186 : if (wait)
227 : {
228 : /*
229 : * If we made no progress, wait for one of the other processes to
230 : * which we are connected to set our latch, indicating that they
231 : * have read or written data and therefore there may now be work
232 : * for us to do.
233 : */
234 8412 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
235 : PG_WAIT_EXTENSION);
236 8412 : ResetLatch(MyLatch);
237 8412 : CHECK_FOR_INTERRUPTS();
238 : }
239 : }
240 :
241 : /* Clean up. */
242 2 : dsm_detach(seg);
243 :
244 2 : PG_RETURN_VOID();
245 : }
246 :
247 : /*
248 : * Verify that two messages are the same.
249 : */
250 : static void
251 408 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
252 : {
253 : Size i;
254 :
255 408 : if (origlen != newlen)
256 0 : ereport(ERROR,
257 : (errmsg("message corrupted"),
258 : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
259 : origlen, newlen)));
260 :
261 108001032 : for (i = 0; i < origlen; ++i)
262 108000624 : if (origdata[i] != newdata[i])
263 0 : ereport(ERROR,
264 : (errmsg("message corrupted"),
265 : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
266 408 : }
|