Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * worker_spi.c
4 : * Sample background worker code that demonstrates various coding
5 : * patterns: establishing a database connection; starting and committing
6 : * transactions; using GUC variables, and heeding SIGHUP to reread
7 : * the configuration file; reporting to pg_stat_activity; using the
8 : * process latch to sleep and exit in case of postmaster death.
9 : *
10 : * This code connects to a database, creates a schema and table, and summarizes
11 : * the numbers contained therein. To see it working, insert an initial value
12 : * with "total" type and some initial value; then insert some other rows with
13 : * "delta" type. Delta rows will be deleted by this worker and their values
14 : * aggregated into the total.
15 : *
16 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
17 : *
18 : * IDENTIFICATION
19 : * src/test/modules/worker_spi/worker_spi.c
20 : *
21 : * -------------------------------------------------------------------------
22 : */
23 : #include "postgres.h"
24 :
25 : /* These are always necessary for a bgworker */
26 : #include "miscadmin.h"
27 : #include "postmaster/bgworker.h"
28 : #include "postmaster/interrupt.h"
29 : #include "storage/latch.h"
30 :
31 : /* these headers are used by this particular worker's code */
32 : #include "access/xact.h"
33 : #include "commands/dbcommands.h"
34 : #include "executor/spi.h"
35 : #include "fmgr.h"
36 : #include "lib/stringinfo.h"
37 : #include "pgstat.h"
38 : #include "tcop/utility.h"
39 : #include "utils/acl.h"
40 : #include "utils/builtins.h"
41 : #include "utils/snapmgr.h"
42 :
43 10 : PG_MODULE_MAGIC;
44 :
45 18 : PG_FUNCTION_INFO_V1(worker_spi_launch);
46 :
47 : PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
48 :
49 : /* GUC variables */
50 : static int worker_spi_naptime = 10;
51 : static int worker_spi_total_workers = 2;
52 : static char *worker_spi_database = NULL;
53 : static char *worker_spi_role = NULL;
54 :
55 : /* value cached, fetched from shared memory */
56 : static uint32 worker_spi_wait_event_main = 0;
57 :
58 : typedef struct worktable
59 : {
60 : const char *schema;
61 : const char *name;
62 : } worktable;
63 :
64 : /*
65 : * Initialize workspace for a worker process: create the schema if it doesn't
66 : * already exist.
67 : */
68 : static void
69 2 : initialize_worker_spi(worktable *table)
70 : {
71 : int ret;
72 : int ntup;
73 : bool isnull;
74 : StringInfoData buf;
75 :
76 2 : SetCurrentStatementStartTimestamp();
77 2 : StartTransactionCommand();
78 2 : SPI_connect();
79 2 : PushActiveSnapshot(GetTransactionSnapshot());
80 2 : pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
81 :
82 : /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
83 2 : initStringInfo(&buf);
84 2 : appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
85 : table->schema);
86 :
87 2 : debug_query_string = buf.data;
88 2 : ret = SPI_execute(buf.data, true, 0);
89 2 : if (ret != SPI_OK_SELECT)
90 0 : elog(FATAL, "SPI_execute failed: error code %d", ret);
91 :
92 2 : if (SPI_processed != 1)
93 0 : elog(FATAL, "not a singleton result");
94 :
95 2 : ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
96 2 : SPI_tuptable->tupdesc,
97 : 1, &isnull));
98 2 : if (isnull)
99 0 : elog(FATAL, "null result");
100 :
101 2 : if (ntup == 0)
102 : {
103 2 : debug_query_string = NULL;
104 2 : resetStringInfo(&buf);
105 2 : appendStringInfo(&buf,
106 : "CREATE SCHEMA \"%s\" "
107 : "CREATE TABLE \"%s\" ("
108 : " type text CHECK (type IN ('total', 'delta')), "
109 : " value integer)"
110 : "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
111 : "WHERE type = 'total'",
112 : table->schema, table->name, table->name, table->name);
113 :
114 : /* set statement start time */
115 2 : SetCurrentStatementStartTimestamp();
116 :
117 2 : debug_query_string = buf.data;
118 2 : ret = SPI_execute(buf.data, false, 0);
119 :
120 2 : if (ret != SPI_OK_UTILITY)
121 0 : elog(FATAL, "failed to create my schema");
122 :
123 2 : debug_query_string = NULL; /* rest is not statement-specific */
124 : }
125 :
126 2 : SPI_finish();
127 2 : PopActiveSnapshot();
128 2 : CommitTransactionCommand();
129 2 : debug_query_string = NULL;
130 2 : pgstat_report_activity(STATE_IDLE, NULL);
131 2 : }
132 :
133 : void
134 6 : worker_spi_main(Datum main_arg)
135 : {
136 6 : int index = DatumGetInt32(main_arg);
137 : worktable *table;
138 : StringInfoData buf;
139 : char name[20];
140 : Oid dboid;
141 : Oid roleoid;
142 : char *p;
143 6 : bits32 flags = 0;
144 :
145 6 : table = palloc(sizeof(worktable));
146 6 : sprintf(name, "schema%d", index);
147 6 : table->schema = pstrdup(name);
148 6 : table->name = pstrdup("counted");
149 :
150 : /* fetch database and role OIDs, these are set for a dynamic worker */
151 6 : p = MyBgworkerEntry->bgw_extra;
152 6 : memcpy(&dboid, p, sizeof(Oid));
153 6 : p += sizeof(Oid);
154 6 : memcpy(&roleoid, p, sizeof(Oid));
155 6 : p += sizeof(Oid);
156 6 : memcpy(&flags, p, sizeof(bits32));
157 :
158 : /* Establish signal handlers before unblocking signals. */
159 6 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
160 6 : pqsignal(SIGTERM, die);
161 :
162 : /* We're now ready to receive signals */
163 6 : BackgroundWorkerUnblockSignals();
164 :
165 : /* Connect to our database */
166 6 : if (OidIsValid(dboid))
167 6 : BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
168 : else
169 0 : BackgroundWorkerInitializeConnection(worker_spi_database,
170 : worker_spi_role, flags);
171 :
172 2 : elog(LOG, "%s initialized with %s.%s",
173 : MyBgworkerEntry->bgw_name, table->schema, table->name);
174 2 : initialize_worker_spi(table);
175 :
176 : /*
177 : * Quote identifiers passed to us. Note that this must be done after
178 : * initialize_worker_spi, because that routine assumes the names are not
179 : * quoted.
180 : *
181 : * Note some memory might be leaked here.
182 : */
183 2 : table->schema = quote_identifier(table->schema);
184 2 : table->name = quote_identifier(table->name);
185 :
186 2 : initStringInfo(&buf);
187 2 : appendStringInfo(&buf,
188 : "WITH deleted AS (DELETE "
189 : "FROM %s.%s "
190 : "WHERE type = 'delta' RETURNING value), "
191 : "total AS (SELECT coalesce(sum(value), 0) as sum "
192 : "FROM deleted) "
193 : "UPDATE %s.%s "
194 : "SET value = %s.value + total.sum "
195 : "FROM total WHERE type = 'total' "
196 : "RETURNING %s.value",
197 : table->schema, table->name,
198 : table->schema, table->name,
199 : table->name,
200 : table->name);
201 :
202 : /*
203 : * Main loop: do this until SIGTERM is received and processed by
204 : * ProcessInterrupts.
205 : */
206 : for (;;)
207 4 : {
208 : int ret;
209 :
210 : /* First time, allocate or get the custom wait event */
211 6 : if (worker_spi_wait_event_main == 0)
212 2 : worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
213 :
214 : /*
215 : * Background workers mustn't call usleep() or any direct equivalent:
216 : * instead, they may wait on their process latch, which sleeps as
217 : * necessary, but is awakened if postmaster dies. That way the
218 : * background process goes away immediately in an emergency.
219 : */
220 6 : (void) WaitLatch(MyLatch,
221 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
222 : worker_spi_naptime * 1000L,
223 : worker_spi_wait_event_main);
224 6 : ResetLatch(MyLatch);
225 :
226 6 : CHECK_FOR_INTERRUPTS();
227 :
228 : /*
229 : * In case of a SIGHUP, just reload the configuration.
230 : */
231 4 : if (ConfigReloadPending)
232 : {
233 2 : ConfigReloadPending = false;
234 2 : ProcessConfigFile(PGC_SIGHUP);
235 : }
236 :
237 : /*
238 : * Start a transaction on which we can run queries. Note that each
239 : * StartTransactionCommand() call should be preceded by a
240 : * SetCurrentStatementStartTimestamp() call, which sets both the time
241 : * for the statement we're about the run, and also the transaction
242 : * start time. Also, each other query sent to SPI should probably be
243 : * preceded by SetCurrentStatementStartTimestamp(), so that statement
244 : * start time is always up to date.
245 : *
246 : * The SPI_connect() call lets us run queries through the SPI manager,
247 : * and the PushActiveSnapshot() call creates an "active" snapshot
248 : * which is necessary for queries to have MVCC data to work on.
249 : *
250 : * The pgstat_report_activity() call makes our activity visible
251 : * through the pgstat views.
252 : */
253 4 : SetCurrentStatementStartTimestamp();
254 4 : StartTransactionCommand();
255 4 : SPI_connect();
256 4 : PushActiveSnapshot(GetTransactionSnapshot());
257 4 : debug_query_string = buf.data;
258 4 : pgstat_report_activity(STATE_RUNNING, buf.data);
259 :
260 : /* We can now execute queries via SPI */
261 4 : ret = SPI_execute(buf.data, false, 0);
262 :
263 4 : if (ret != SPI_OK_UPDATE_RETURNING)
264 0 : elog(FATAL, "cannot select from table %s.%s: error code %d",
265 : table->schema, table->name, ret);
266 :
267 4 : if (SPI_processed > 0)
268 : {
269 : bool isnull;
270 : int32 val;
271 :
272 2 : val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
273 2 : SPI_tuptable->tupdesc,
274 : 1, &isnull));
275 2 : if (!isnull)
276 2 : elog(LOG, "%s: count in %s.%s is now %d",
277 : MyBgworkerEntry->bgw_name,
278 : table->schema, table->name, val);
279 : }
280 :
281 : /*
282 : * And finish our transaction.
283 : */
284 4 : SPI_finish();
285 4 : PopActiveSnapshot();
286 4 : CommitTransactionCommand();
287 4 : debug_query_string = NULL;
288 4 : pgstat_report_stat(true);
289 4 : pgstat_report_activity(STATE_IDLE, NULL);
290 : }
291 :
292 : /* Not reachable */
293 : }
294 :
295 : /*
296 : * Entrypoint of this module.
297 : *
298 : * We register more than one worker process here, to demonstrate how that can
299 : * be done.
300 : */
301 : void
302 10 : _PG_init(void)
303 : {
304 : BackgroundWorker worker;
305 :
306 : /* get the configuration */
307 :
308 : /*
309 : * These GUCs are defined even if this library is not loaded with
310 : * shared_preload_libraries, for worker_spi_launch().
311 : */
312 10 : DefineCustomIntVariable("worker_spi.naptime",
313 : "Duration between each check (in seconds).",
314 : NULL,
315 : &worker_spi_naptime,
316 : 10,
317 : 1,
318 : INT_MAX,
319 : PGC_SIGHUP,
320 : 0,
321 : NULL,
322 : NULL,
323 : NULL);
324 :
325 10 : DefineCustomStringVariable("worker_spi.database",
326 : "Database to connect to.",
327 : NULL,
328 : &worker_spi_database,
329 : "postgres",
330 : PGC_SIGHUP,
331 : 0,
332 : NULL, NULL, NULL);
333 :
334 10 : DefineCustomStringVariable("worker_spi.role",
335 : "Role to connect with.",
336 : NULL,
337 : &worker_spi_role,
338 : NULL,
339 : PGC_SIGHUP,
340 : 0,
341 : NULL, NULL, NULL);
342 :
343 10 : if (!process_shared_preload_libraries_in_progress)
344 8 : return;
345 :
346 2 : DefineCustomIntVariable("worker_spi.total_workers",
347 : "Number of workers.",
348 : NULL,
349 : &worker_spi_total_workers,
350 : 2,
351 : 1,
352 : 100,
353 : PGC_POSTMASTER,
354 : 0,
355 : NULL,
356 : NULL,
357 : NULL);
358 :
359 2 : MarkGUCPrefixReserved("worker_spi");
360 :
361 : /* set up common data for all our workers */
362 2 : memset(&worker, 0, sizeof(worker));
363 2 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
364 : BGWORKER_BACKEND_DATABASE_CONNECTION;
365 2 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
366 2 : worker.bgw_restart_time = BGW_NEVER_RESTART;
367 2 : sprintf(worker.bgw_library_name, "worker_spi");
368 2 : sprintf(worker.bgw_function_name, "worker_spi_main");
369 2 : worker.bgw_notify_pid = 0;
370 :
371 : /*
372 : * Now fill in worker-specific data, and do the actual registrations.
373 : *
374 : * bgw_extra can optionally include a database OID, a role OID and a set
375 : * of flags. This is left empty here to fallback to the related GUCs at
376 : * startup (0 for the bgworker flags).
377 : */
378 8 : for (int i = 1; i <= worker_spi_total_workers; i++)
379 : {
380 6 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
381 6 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
382 6 : worker.bgw_main_arg = Int32GetDatum(i);
383 :
384 6 : RegisterBackgroundWorker(&worker);
385 : }
386 : }
387 :
388 : /*
389 : * Dynamically launch an SPI worker.
390 : */
391 : Datum
392 14 : worker_spi_launch(PG_FUNCTION_ARGS)
393 : {
394 14 : int32 i = PG_GETARG_INT32(0);
395 14 : Oid dboid = PG_GETARG_OID(1);
396 14 : Oid roleoid = PG_GETARG_OID(2);
397 : BackgroundWorker worker;
398 : BackgroundWorkerHandle *handle;
399 : BgwHandleStatus status;
400 : pid_t pid;
401 : char *p;
402 14 : bits32 flags = 0;
403 14 : ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
404 : Size ndim;
405 : int nelems;
406 : Datum *datum_flags;
407 :
408 14 : memset(&worker, 0, sizeof(worker));
409 14 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
410 : BGWORKER_BACKEND_DATABASE_CONNECTION;
411 14 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
412 14 : worker.bgw_restart_time = BGW_NEVER_RESTART;
413 14 : sprintf(worker.bgw_library_name, "worker_spi");
414 14 : sprintf(worker.bgw_function_name, "worker_spi_main");
415 14 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
416 14 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
417 14 : worker.bgw_main_arg = Int32GetDatum(i);
418 : /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
419 14 : worker.bgw_notify_pid = MyProcPid;
420 :
421 : /* extract flags, if any */
422 14 : ndim = ARR_NDIM(arr);
423 14 : if (ndim > 1)
424 0 : ereport(ERROR,
425 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
426 : errmsg("flags array must be one-dimensional")));
427 :
428 14 : if (array_contains_nulls(arr))
429 0 : ereport(ERROR,
430 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
431 : errmsg("flags array must not contain nulls")));
432 :
433 : Assert(ARR_ELEMTYPE(arr) == TEXTOID);
434 14 : deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
435 :
436 18 : for (i = 0; i < nelems; i++)
437 : {
438 4 : char *optname = TextDatumGetCString(datum_flags[i]);
439 :
440 4 : if (strcmp(optname, "ALLOWCONN") == 0)
441 2 : flags |= BGWORKER_BYPASS_ALLOWCONN;
442 2 : else if (strcmp(optname, "ROLELOGINCHECK") == 0)
443 2 : flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
444 : else
445 0 : ereport(ERROR,
446 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
447 : errmsg("incorrect flag value found in array")));
448 : }
449 :
450 : /*
451 : * Register database and role to use for the worker started in bgw_extra.
452 : * If none have been provided, this will fall back to the GUCs at startup.
453 : */
454 14 : if (!OidIsValid(dboid))
455 2 : dboid = get_database_oid(worker_spi_database, false);
456 :
457 : /*
458 : * worker_spi_role is NULL by default, so this gives to worker_spi_main()
459 : * an invalid OID in this case.
460 : */
461 14 : if (!OidIsValid(roleoid) && worker_spi_role)
462 0 : roleoid = get_role_oid(worker_spi_role, false);
463 :
464 14 : p = worker.bgw_extra;
465 14 : memcpy(p, &dboid, sizeof(Oid));
466 14 : p += sizeof(Oid);
467 14 : memcpy(p, &roleoid, sizeof(Oid));
468 14 : p += sizeof(Oid);
469 14 : memcpy(p, &flags, sizeof(bits32));
470 :
471 14 : if (!RegisterDynamicBackgroundWorker(&worker, &handle))
472 0 : PG_RETURN_NULL();
473 :
474 14 : status = WaitForBackgroundWorkerStartup(handle, &pid);
475 :
476 14 : if (status == BGWH_STOPPED)
477 0 : ereport(ERROR,
478 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
479 : errmsg("could not start background process"),
480 : errhint("More details may be available in the server log.")));
481 14 : if (status == BGWH_POSTMASTER_DIED)
482 0 : ereport(ERROR,
483 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
484 : errmsg("cannot start background processes without postmaster"),
485 : errhint("Kill all remaining database processes and restart the database.")));
486 : Assert(status == BGWH_STARTED);
487 :
488 14 : PG_RETURN_INT32(pid);
489 : }
|