Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * worker_spi.c
4 : * Sample background worker code that demonstrates various coding
5 : * patterns: establishing a database connection; starting and committing
6 : * transactions; using GUC variables, and heeding SIGHUP to reread
7 : * the configuration file; reporting to pg_stat_activity; using the
8 : * process latch to sleep and exit in case of postmaster death.
9 : *
10 : * This code connects to a database, creates a schema and table, and summarizes
11 : * the numbers contained therein. To see it working, insert an initial value
12 : * with "total" type and some initial value; then insert some other rows with
13 : * "delta" type. Delta rows will be deleted by this worker and their values
14 : * aggregated into the total.
15 : *
16 : * Copyright (c) 2013-2024, PostgreSQL Global Development Group
17 : *
18 : * IDENTIFICATION
19 : * src/test/modules/worker_spi/worker_spi.c
20 : *
21 : * -------------------------------------------------------------------------
22 : */
23 : #include "postgres.h"
24 :
25 : /* These are always necessary for a bgworker */
26 : #include "miscadmin.h"
27 : #include "postmaster/bgworker.h"
28 : #include "postmaster/interrupt.h"
29 : #include "storage/ipc.h"
30 : #include "storage/latch.h"
31 : #include "storage/lwlock.h"
32 : #include "storage/proc.h"
33 : #include "storage/shmem.h"
34 :
35 : /* these headers are used by this particular worker's code */
36 : #include "access/xact.h"
37 : #include "commands/dbcommands.h"
38 : #include "executor/spi.h"
39 : #include "fmgr.h"
40 : #include "lib/stringinfo.h"
41 : #include "pgstat.h"
42 : #include "tcop/utility.h"
43 : #include "utils/acl.h"
44 : #include "utils/builtins.h"
45 : #include "utils/snapmgr.h"
46 :
47 10 : PG_MODULE_MAGIC;
48 :
49 18 : PG_FUNCTION_INFO_V1(worker_spi_launch);
50 :
51 : PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
52 :
53 : /* GUC variables */
54 : static int worker_spi_naptime = 10;
55 : static int worker_spi_total_workers = 2;
56 : static char *worker_spi_database = NULL;
57 : static char *worker_spi_role = NULL;
58 :
59 : /* value cached, fetched from shared memory */
60 : static uint32 worker_spi_wait_event_main = 0;
61 :
62 : typedef struct worktable
63 : {
64 : const char *schema;
65 : const char *name;
66 : } worktable;
67 :
68 : /*
69 : * Initialize workspace for a worker process: create the schema if it doesn't
70 : * already exist.
71 : */
72 : static void
73 2 : initialize_worker_spi(worktable *table)
74 : {
75 : int ret;
76 : int ntup;
77 : bool isnull;
78 : StringInfoData buf;
79 :
80 2 : SetCurrentStatementStartTimestamp();
81 2 : StartTransactionCommand();
82 2 : SPI_connect();
83 2 : PushActiveSnapshot(GetTransactionSnapshot());
84 2 : pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
85 :
86 : /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
87 2 : initStringInfo(&buf);
88 2 : appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
89 : table->schema);
90 :
91 2 : debug_query_string = buf.data;
92 2 : ret = SPI_execute(buf.data, true, 0);
93 2 : if (ret != SPI_OK_SELECT)
94 0 : elog(FATAL, "SPI_execute failed: error code %d", ret);
95 :
96 2 : if (SPI_processed != 1)
97 0 : elog(FATAL, "not a singleton result");
98 :
99 2 : ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
100 2 : SPI_tuptable->tupdesc,
101 : 1, &isnull));
102 2 : if (isnull)
103 0 : elog(FATAL, "null result");
104 :
105 2 : if (ntup == 0)
106 : {
107 2 : debug_query_string = NULL;
108 2 : resetStringInfo(&buf);
109 2 : appendStringInfo(&buf,
110 : "CREATE SCHEMA \"%s\" "
111 : "CREATE TABLE \"%s\" ("
112 : " type text CHECK (type IN ('total', 'delta')), "
113 : " value integer)"
114 : "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
115 : "WHERE type = 'total'",
116 : table->schema, table->name, table->name, table->name);
117 :
118 : /* set statement start time */
119 2 : SetCurrentStatementStartTimestamp();
120 :
121 2 : debug_query_string = buf.data;
122 2 : ret = SPI_execute(buf.data, false, 0);
123 :
124 2 : if (ret != SPI_OK_UTILITY)
125 0 : elog(FATAL, "failed to create my schema");
126 :
127 2 : debug_query_string = NULL; /* rest is not statement-specific */
128 : }
129 :
130 2 : SPI_finish();
131 2 : PopActiveSnapshot();
132 2 : CommitTransactionCommand();
133 2 : debug_query_string = NULL;
134 2 : pgstat_report_activity(STATE_IDLE, NULL);
135 2 : }
136 :
137 : void
138 6 : worker_spi_main(Datum main_arg)
139 : {
140 6 : int index = DatumGetInt32(main_arg);
141 : worktable *table;
142 : StringInfoData buf;
143 : char name[20];
144 : Oid dboid;
145 : Oid roleoid;
146 : char *p;
147 6 : bits32 flags = 0;
148 :
149 6 : table = palloc(sizeof(worktable));
150 6 : sprintf(name, "schema%d", index);
151 6 : table->schema = pstrdup(name);
152 6 : table->name = pstrdup("counted");
153 :
154 : /* fetch database and role OIDs, these are set for a dynamic worker */
155 6 : p = MyBgworkerEntry->bgw_extra;
156 6 : memcpy(&dboid, p, sizeof(Oid));
157 6 : p += sizeof(Oid);
158 6 : memcpy(&roleoid, p, sizeof(Oid));
159 6 : p += sizeof(Oid);
160 6 : memcpy(&flags, p, sizeof(bits32));
161 :
162 : /* Establish signal handlers before unblocking signals. */
163 6 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
164 6 : pqsignal(SIGTERM, die);
165 :
166 : /* We're now ready to receive signals */
167 6 : BackgroundWorkerUnblockSignals();
168 :
169 : /* Connect to our database */
170 6 : if (OidIsValid(dboid))
171 6 : BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
172 : else
173 0 : BackgroundWorkerInitializeConnection(worker_spi_database,
174 : worker_spi_role, flags);
175 :
176 : /*
177 : * Disable parallel query for workers started with
178 : * BGWORKER_BYPASS_ALLOWCONN or BGWORKER_BYPASS_ROLELOGINCHECK so as these
179 : * don't attempt connections using a database or a role that may not allow
180 : * that.
181 : */
182 2 : if ((flags & (BGWORKER_BYPASS_ALLOWCONN | BGWORKER_BYPASS_ROLELOGINCHECK)))
183 0 : SetConfigOption("max_parallel_workers_per_gather", "0",
184 : PGC_USERSET, PGC_S_OVERRIDE);
185 :
186 2 : elog(LOG, "%s initialized with %s.%s",
187 : MyBgworkerEntry->bgw_name, table->schema, table->name);
188 2 : initialize_worker_spi(table);
189 :
190 : /*
191 : * Quote identifiers passed to us. Note that this must be done after
192 : * initialize_worker_spi, because that routine assumes the names are not
193 : * quoted.
194 : *
195 : * Note some memory might be leaked here.
196 : */
197 2 : table->schema = quote_identifier(table->schema);
198 2 : table->name = quote_identifier(table->name);
199 :
200 2 : initStringInfo(&buf);
201 2 : appendStringInfo(&buf,
202 : "WITH deleted AS (DELETE "
203 : "FROM %s.%s "
204 : "WHERE type = 'delta' RETURNING value), "
205 : "total AS (SELECT coalesce(sum(value), 0) as sum "
206 : "FROM deleted) "
207 : "UPDATE %s.%s "
208 : "SET value = %s.value + total.sum "
209 : "FROM total WHERE type = 'total' "
210 : "RETURNING %s.value",
211 : table->schema, table->name,
212 : table->schema, table->name,
213 : table->name,
214 : table->name);
215 :
216 : /*
217 : * Main loop: do this until SIGTERM is received and processed by
218 : * ProcessInterrupts.
219 : */
220 : for (;;)
221 4 : {
222 : int ret;
223 :
224 : /* First time, allocate or get the custom wait event */
225 6 : if (worker_spi_wait_event_main == 0)
226 2 : worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
227 :
228 : /*
229 : * Background workers mustn't call usleep() or any direct equivalent:
230 : * instead, they may wait on their process latch, which sleeps as
231 : * necessary, but is awakened if postmaster dies. That way the
232 : * background process goes away immediately in an emergency.
233 : */
234 6 : (void) WaitLatch(MyLatch,
235 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
236 : worker_spi_naptime * 1000L,
237 : worker_spi_wait_event_main);
238 6 : ResetLatch(MyLatch);
239 :
240 6 : CHECK_FOR_INTERRUPTS();
241 :
242 : /*
243 : * In case of a SIGHUP, just reload the configuration.
244 : */
245 4 : if (ConfigReloadPending)
246 : {
247 2 : ConfigReloadPending = false;
248 2 : ProcessConfigFile(PGC_SIGHUP);
249 : }
250 :
251 : /*
252 : * Start a transaction on which we can run queries. Note that each
253 : * StartTransactionCommand() call should be preceded by a
254 : * SetCurrentStatementStartTimestamp() call, which sets both the time
255 : * for the statement we're about the run, and also the transaction
256 : * start time. Also, each other query sent to SPI should probably be
257 : * preceded by SetCurrentStatementStartTimestamp(), so that statement
258 : * start time is always up to date.
259 : *
260 : * The SPI_connect() call lets us run queries through the SPI manager,
261 : * and the PushActiveSnapshot() call creates an "active" snapshot
262 : * which is necessary for queries to have MVCC data to work on.
263 : *
264 : * The pgstat_report_activity() call makes our activity visible
265 : * through the pgstat views.
266 : */
267 4 : SetCurrentStatementStartTimestamp();
268 4 : StartTransactionCommand();
269 4 : SPI_connect();
270 4 : PushActiveSnapshot(GetTransactionSnapshot());
271 4 : debug_query_string = buf.data;
272 4 : pgstat_report_activity(STATE_RUNNING, buf.data);
273 :
274 : /* We can now execute queries via SPI */
275 4 : ret = SPI_execute(buf.data, false, 0);
276 :
277 4 : if (ret != SPI_OK_UPDATE_RETURNING)
278 0 : elog(FATAL, "cannot select from table %s.%s: error code %d",
279 : table->schema, table->name, ret);
280 :
281 4 : if (SPI_processed > 0)
282 : {
283 : bool isnull;
284 : int32 val;
285 :
286 2 : val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
287 2 : SPI_tuptable->tupdesc,
288 : 1, &isnull));
289 2 : if (!isnull)
290 2 : elog(LOG, "%s: count in %s.%s is now %d",
291 : MyBgworkerEntry->bgw_name,
292 : table->schema, table->name, val);
293 : }
294 :
295 : /*
296 : * And finish our transaction.
297 : */
298 4 : SPI_finish();
299 4 : PopActiveSnapshot();
300 4 : CommitTransactionCommand();
301 4 : debug_query_string = NULL;
302 4 : pgstat_report_stat(true);
303 4 : pgstat_report_activity(STATE_IDLE, NULL);
304 : }
305 :
306 : /* Not reachable */
307 : }
308 :
309 : /*
310 : * Entrypoint of this module.
311 : *
312 : * We register more than one worker process here, to demonstrate how that can
313 : * be done.
314 : */
315 : void
316 10 : _PG_init(void)
317 : {
318 : BackgroundWorker worker;
319 :
320 : /* get the configuration */
321 :
322 : /*
323 : * These GUCs are defined even if this library is not loaded with
324 : * shared_preload_libraries, for worker_spi_launch().
325 : */
326 10 : DefineCustomIntVariable("worker_spi.naptime",
327 : "Duration between each check (in seconds).",
328 : NULL,
329 : &worker_spi_naptime,
330 : 10,
331 : 1,
332 : INT_MAX,
333 : PGC_SIGHUP,
334 : 0,
335 : NULL,
336 : NULL,
337 : NULL);
338 :
339 10 : DefineCustomStringVariable("worker_spi.database",
340 : "Database to connect to.",
341 : NULL,
342 : &worker_spi_database,
343 : "postgres",
344 : PGC_SIGHUP,
345 : 0,
346 : NULL, NULL, NULL);
347 :
348 10 : DefineCustomStringVariable("worker_spi.role",
349 : "Role to connect with.",
350 : NULL,
351 : &worker_spi_role,
352 : NULL,
353 : PGC_SIGHUP,
354 : 0,
355 : NULL, NULL, NULL);
356 :
357 10 : if (!process_shared_preload_libraries_in_progress)
358 8 : return;
359 :
360 2 : DefineCustomIntVariable("worker_spi.total_workers",
361 : "Number of workers.",
362 : NULL,
363 : &worker_spi_total_workers,
364 : 2,
365 : 1,
366 : 100,
367 : PGC_POSTMASTER,
368 : 0,
369 : NULL,
370 : NULL,
371 : NULL);
372 :
373 2 : MarkGUCPrefixReserved("worker_spi");
374 :
375 : /* set up common data for all our workers */
376 2 : memset(&worker, 0, sizeof(worker));
377 2 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
378 : BGWORKER_BACKEND_DATABASE_CONNECTION;
379 2 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
380 2 : worker.bgw_restart_time = BGW_NEVER_RESTART;
381 2 : sprintf(worker.bgw_library_name, "worker_spi");
382 2 : sprintf(worker.bgw_function_name, "worker_spi_main");
383 2 : worker.bgw_notify_pid = 0;
384 :
385 : /*
386 : * Now fill in worker-specific data, and do the actual registrations.
387 : *
388 : * bgw_extra can optionally include a database OID, a role OID and a set
389 : * of flags. This is left empty here to fallback to the related GUCs at
390 : * startup (0 for the bgworker flags).
391 : */
392 8 : for (int i = 1; i <= worker_spi_total_workers; i++)
393 : {
394 6 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
395 6 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
396 6 : worker.bgw_main_arg = Int32GetDatum(i);
397 :
398 6 : RegisterBackgroundWorker(&worker);
399 : }
400 : }
401 :
402 : /*
403 : * Dynamically launch an SPI worker.
404 : */
405 : Datum
406 14 : worker_spi_launch(PG_FUNCTION_ARGS)
407 : {
408 14 : int32 i = PG_GETARG_INT32(0);
409 14 : Oid dboid = PG_GETARG_OID(1);
410 14 : Oid roleoid = PG_GETARG_OID(2);
411 : BackgroundWorker worker;
412 : BackgroundWorkerHandle *handle;
413 : BgwHandleStatus status;
414 : pid_t pid;
415 : char *p;
416 14 : bits32 flags = 0;
417 14 : ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
418 : Size ndim;
419 : int nelems;
420 : Datum *datum_flags;
421 :
422 14 : memset(&worker, 0, sizeof(worker));
423 14 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
424 : BGWORKER_BACKEND_DATABASE_CONNECTION;
425 14 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
426 14 : worker.bgw_restart_time = BGW_NEVER_RESTART;
427 14 : sprintf(worker.bgw_library_name, "worker_spi");
428 14 : sprintf(worker.bgw_function_name, "worker_spi_main");
429 14 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
430 14 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
431 14 : worker.bgw_main_arg = Int32GetDatum(i);
432 : /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
433 14 : worker.bgw_notify_pid = MyProcPid;
434 :
435 : /* extract flags, if any */
436 14 : ndim = ARR_NDIM(arr);
437 14 : if (ndim > 1)
438 0 : ereport(ERROR,
439 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
440 : errmsg("flags array must be one-dimensional")));
441 :
442 14 : if (array_contains_nulls(arr))
443 0 : ereport(ERROR,
444 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
445 : errmsg("flags array must not contain nulls")));
446 :
447 : Assert(ARR_ELEMTYPE(arr) == TEXTOID);
448 14 : deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
449 :
450 18 : for (i = 0; i < nelems; i++)
451 : {
452 4 : char *optname = TextDatumGetCString(datum_flags[i]);
453 :
454 4 : if (strcmp(optname, "ALLOWCONN") == 0)
455 2 : flags |= BGWORKER_BYPASS_ALLOWCONN;
456 2 : else if (strcmp(optname, "ROLELOGINCHECK") == 0)
457 2 : flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
458 : else
459 0 : ereport(ERROR,
460 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
461 : errmsg("incorrect flag value found in array")));
462 : }
463 :
464 : /*
465 : * Register database and role to use for the worker started in bgw_extra.
466 : * If none have been provided, this will fall back to the GUCs at startup.
467 : */
468 14 : if (!OidIsValid(dboid))
469 2 : dboid = get_database_oid(worker_spi_database, false);
470 :
471 : /*
472 : * worker_spi_role is NULL by default, so this gives to worker_spi_main()
473 : * an invalid OID in this case.
474 : */
475 14 : if (!OidIsValid(roleoid) && worker_spi_role)
476 0 : roleoid = get_role_oid(worker_spi_role, false);
477 :
478 14 : p = worker.bgw_extra;
479 14 : memcpy(p, &dboid, sizeof(Oid));
480 14 : p += sizeof(Oid);
481 14 : memcpy(p, &roleoid, sizeof(Oid));
482 14 : p += sizeof(Oid);
483 14 : memcpy(p, &flags, sizeof(bits32));
484 :
485 14 : if (!RegisterDynamicBackgroundWorker(&worker, &handle))
486 0 : PG_RETURN_NULL();
487 :
488 14 : status = WaitForBackgroundWorkerStartup(handle, &pid);
489 :
490 14 : if (status == BGWH_STOPPED)
491 0 : ereport(ERROR,
492 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
493 : errmsg("could not start background process"),
494 : errhint("More details may be available in the server log.")));
495 14 : if (status == BGWH_POSTMASTER_DIED)
496 0 : ereport(ERROR,
497 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
498 : errmsg("cannot start background processes without postmaster"),
499 : errhint("Kill all remaining database processes and restart the database.")));
500 : Assert(status == BGWH_STARTED);
501 :
502 14 : PG_RETURN_INT32(pid);
503 : }
|