LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 91.9 % 160 147
Test Date: 2026-03-17 15:16:44 Functions: 100.0 % 6 6
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * worker_spi.c
       4              :  *      Sample background worker code that demonstrates various coding
       5              :  *      patterns: establishing a database connection; starting and committing
       6              :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7              :  *      the configuration file; reporting to pg_stat_activity; using the
       8              :  *      process latch to sleep and exit in case of postmaster death.
       9              :  *
      10              :  * This code connects to a database, creates a schema and table, and summarizes
      11              :  * the numbers contained therein.  To see it working, insert an initial value
      12              :  * with "total" type and some initial value; then insert some other rows with
      13              :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14              :  * aggregated into the total.
      15              :  *
      16              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
      17              :  *
      18              :  * IDENTIFICATION
      19              :  *      src/test/modules/worker_spi/worker_spi.c
      20              :  *
      21              :  * -------------------------------------------------------------------------
      22              :  */
      23              : #include "postgres.h"
      24              : 
      25              : /* These are always necessary for a bgworker */
      26              : #include "miscadmin.h"
      27              : #include "postmaster/bgworker.h"
      28              : #include "postmaster/interrupt.h"
      29              : #include "storage/latch.h"
      30              : 
      31              : /* these headers are used by this particular worker's code */
      32              : #include "access/xact.h"
      33              : #include "catalog/pg_database.h"
      34              : #include "executor/spi.h"
      35              : #include "fmgr.h"
      36              : #include "lib/stringinfo.h"
      37              : #include "pgstat.h"
      38              : #include "tcop/utility.h"
      39              : #include "utils/acl.h"
      40              : #include "utils/builtins.h"
      41              : #include "utils/snapmgr.h"
      42              : #include "utils/wait_event.h"
      43              : 
      44           16 : PG_MODULE_MAGIC;
      45              : 
      46           15 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      47              : 
      48              : PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg);
      49              : 
      50              : /* GUC variables */
      51              : static int  worker_spi_naptime = 10;
      52              : static int  worker_spi_total_workers = 2;
      53              : static char *worker_spi_database = NULL;
      54              : static char *worker_spi_role = NULL;
      55              : 
      56              : /* value cached, fetched from shared memory */
      57              : static uint32 worker_spi_wait_event_main = 0;
      58              : 
      59              : typedef struct worktable
      60              : {
      61              :     const char *schema;
      62              :     const char *name;
      63              : } worktable;
      64              : 
      65              : /*
      66              :  * Initialize workspace for a worker process: create the schema if it doesn't
      67              :  * already exist.
      68              :  */
      69              : static void
      70            6 : initialize_worker_spi(worktable *table)
      71              : {
      72              :     int         ret;
      73              :     int         ntup;
      74              :     bool        isnull;
      75              :     StringInfoData buf;
      76              : 
      77            6 :     SetCurrentStatementStartTimestamp();
      78            6 :     StartTransactionCommand();
      79            6 :     SPI_connect();
      80            6 :     PushActiveSnapshot(GetTransactionSnapshot());
      81            6 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      82              : 
      83              :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      84            6 :     initStringInfo(&buf);
      85            6 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      86              :                      table->schema);
      87              : 
      88            6 :     debug_query_string = buf.data;
      89            6 :     ret = SPI_execute(buf.data, true, 0);
      90            6 :     if (ret != SPI_OK_SELECT)
      91            0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
      92              : 
      93            6 :     if (SPI_processed != 1)
      94            0 :         elog(FATAL, "not a singleton result");
      95              : 
      96            6 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
      97            6 :                                        SPI_tuptable->tupdesc,
      98              :                                        1, &isnull));
      99            6 :     if (isnull)
     100            0 :         elog(FATAL, "null result");
     101              : 
     102            6 :     if (ntup == 0)
     103              :     {
     104            6 :         debug_query_string = NULL;
     105            6 :         resetStringInfo(&buf);
     106            6 :         appendStringInfo(&buf,
     107              :                          "CREATE SCHEMA \"%s\" "
     108              :                          "CREATE TABLE \"%s\" ("
     109              :                          "     type text CHECK (type IN ('total', 'delta')), "
     110              :                          "     value   integer)"
     111              :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     112              :                          "WHERE type = 'total'",
     113              :                          table->schema, table->name, table->name, table->name);
     114              : 
     115              :         /* set statement start time */
     116            6 :         SetCurrentStatementStartTimestamp();
     117              : 
     118            6 :         debug_query_string = buf.data;
     119            6 :         ret = SPI_execute(buf.data, false, 0);
     120              : 
     121            6 :         if (ret != SPI_OK_UTILITY)
     122            0 :             elog(FATAL, "failed to create my schema");
     123              : 
     124            6 :         debug_query_string = NULL;  /* rest is not statement-specific */
     125              :     }
     126              : 
     127            6 :     SPI_finish();
     128            6 :     PopActiveSnapshot();
     129            6 :     CommitTransactionCommand();
     130            6 :     debug_query_string = NULL;
     131            6 :     pgstat_report_activity(STATE_IDLE, NULL);
     132            6 : }
     133              : 
     134              : void
     135            8 : worker_spi_main(Datum main_arg)
     136              : {
     137            8 :     int         index = DatumGetInt32(main_arg);
     138              :     worktable  *table;
     139              :     StringInfoData buf;
     140              :     char        name[20];
     141              :     Oid         dboid;
     142              :     Oid         roleoid;
     143              :     char       *p;
     144            8 :     bits32      flags = 0;
     145              : 
     146            8 :     table = palloc_object(worktable);
     147            8 :     sprintf(name, "schema%d", index);
     148            8 :     table->schema = pstrdup(name);
     149            8 :     table->name = pstrdup("counted");
     150              : 
     151              :     /* fetch database and role OIDs, these are set for a dynamic worker */
     152            8 :     p = MyBgworkerEntry->bgw_extra;
     153            8 :     memcpy(&dboid, p, sizeof(Oid));
     154            8 :     p += sizeof(Oid);
     155            8 :     memcpy(&roleoid, p, sizeof(Oid));
     156            8 :     p += sizeof(Oid);
     157            8 :     memcpy(&flags, p, sizeof(bits32));
     158              : 
     159              :     /* Establish signal handlers before unblocking signals. */
     160            8 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     161            8 :     pqsignal(SIGTERM, die);
     162              : 
     163              :     /* We're now ready to receive signals */
     164            8 :     BackgroundWorkerUnblockSignals();
     165              : 
     166              :     /* Connect to our database */
     167            8 :     if (OidIsValid(dboid))
     168            8 :         BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
     169              :     else
     170            0 :         BackgroundWorkerInitializeConnection(worker_spi_database,
     171              :                                              worker_spi_role, flags);
     172              : 
     173            6 :     elog(LOG, "%s initialized with %s.%s",
     174              :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     175            6 :     initialize_worker_spi(table);
     176              : 
     177              :     /*
     178              :      * Quote identifiers passed to us.  Note that this must be done after
     179              :      * initialize_worker_spi, because that routine assumes the names are not
     180              :      * quoted.
     181              :      *
     182              :      * Note some memory might be leaked here.
     183              :      */
     184            6 :     table->schema = quote_identifier(table->schema);
     185            6 :     table->name = quote_identifier(table->name);
     186              : 
     187            6 :     initStringInfo(&buf);
     188            6 :     appendStringInfo(&buf,
     189              :                      "WITH deleted AS (DELETE "
     190              :                      "FROM %s.%s "
     191              :                      "WHERE type = 'delta' RETURNING value), "
     192              :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     193              :                      "FROM deleted) "
     194              :                      "UPDATE %s.%s "
     195              :                      "SET value = %s.value + total.sum "
     196              :                      "FROM total WHERE type = 'total' "
     197              :                      "RETURNING %s.value",
     198              :                      table->schema, table->name,
     199              :                      table->schema, table->name,
     200              :                      table->name,
     201              :                      table->name);
     202              : 
     203              :     /*
     204              :      * Main loop: do this until SIGTERM is received and processed by
     205              :      * ProcessInterrupts.
     206              :      */
     207              :     for (;;)
     208            7 :     {
     209              :         int         ret;
     210              : 
     211              :         /* First time, allocate or get the custom wait event */
     212           13 :         if (worker_spi_wait_event_main == 0)
     213            6 :             worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
     214              : 
     215              :         /*
     216              :          * Background workers mustn't call usleep() or any direct equivalent:
     217              :          * instead, they may wait on their process latch, which sleeps as
     218              :          * necessary, but is awakened if postmaster dies.  That way the
     219              :          * background process goes away immediately in an emergency.
     220              :          */
     221           13 :         (void) WaitLatch(MyLatch,
     222              :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     223              :                          worker_spi_naptime * 1000L,
     224              :                          worker_spi_wait_event_main);
     225           13 :         ResetLatch(MyLatch);
     226              : 
     227           13 :         CHECK_FOR_INTERRUPTS();
     228              : 
     229              :         /*
     230              :          * In case of a SIGHUP, just reload the configuration.
     231              :          */
     232            7 :         if (ConfigReloadPending)
     233              :         {
     234            1 :             ConfigReloadPending = false;
     235            1 :             ProcessConfigFile(PGC_SIGHUP);
     236              :         }
     237              : 
     238              :         /*
     239              :          * Start a transaction on which we can run queries.  Note that each
     240              :          * StartTransactionCommand() call should be preceded by a
     241              :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     242              :          * for the statement we're about the run, and also the transaction
     243              :          * start time.  Also, each other query sent to SPI should probably be
     244              :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     245              :          * start time is always up to date.
     246              :          *
     247              :          * The SPI_connect() call lets us run queries through the SPI manager,
     248              :          * and the PushActiveSnapshot() call creates an "active" snapshot
     249              :          * which is necessary for queries to have MVCC data to work on.
     250              :          *
     251              :          * The pgstat_report_activity() call makes our activity visible
     252              :          * through the pgstat views.
     253              :          */
     254            7 :         SetCurrentStatementStartTimestamp();
     255            7 :         StartTransactionCommand();
     256            7 :         SPI_connect();
     257            7 :         PushActiveSnapshot(GetTransactionSnapshot());
     258            7 :         debug_query_string = buf.data;
     259            7 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     260              : 
     261              :         /* We can now execute queries via SPI */
     262            7 :         ret = SPI_execute(buf.data, false, 0);
     263              : 
     264            7 :         if (ret != SPI_OK_UPDATE_RETURNING)
     265            0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     266              :                  table->schema, table->name, ret);
     267              : 
     268            7 :         if (SPI_processed > 0)
     269              :         {
     270              :             bool        isnull;
     271              :             int32       val;
     272              : 
     273            1 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     274            1 :                                               SPI_tuptable->tupdesc,
     275              :                                               1, &isnull));
     276            1 :             if (!isnull)
     277            1 :                 elog(LOG, "%s: count in %s.%s is now %d",
     278              :                      MyBgworkerEntry->bgw_name,
     279              :                      table->schema, table->name, val);
     280              :         }
     281              : 
     282              :         /*
     283              :          * And finish our transaction.
     284              :          */
     285            7 :         SPI_finish();
     286            7 :         PopActiveSnapshot();
     287            7 :         CommitTransactionCommand();
     288            7 :         debug_query_string = NULL;
     289            7 :         pgstat_report_stat(true);
     290            7 :         pgstat_report_activity(STATE_IDLE, NULL);
     291              :     }
     292              : 
     293              :     /* Not reachable */
     294              : }
     295              : 
     296              : /*
     297              :  * Entrypoint of this module.
     298              :  *
     299              :  * We register more than one worker process here, to demonstrate how that can
     300              :  * be done.
     301              :  */
     302              : void
     303           16 : _PG_init(void)
     304              : {
     305              :     BackgroundWorker worker;
     306              : 
     307              :     /* get the configuration */
     308              : 
     309              :     /*
     310              :      * These GUCs are defined even if this library is not loaded with
     311              :      * shared_preload_libraries, for worker_spi_launch().
     312              :      */
     313           16 :     DefineCustomIntVariable("worker_spi.naptime",
     314              :                             "Duration between each check (in seconds).",
     315              :                             NULL,
     316              :                             &worker_spi_naptime,
     317              :                             10,
     318              :                             1,
     319              :                             INT_MAX,
     320              :                             PGC_SIGHUP,
     321              :                             0,
     322              :                             NULL,
     323              :                             NULL,
     324              :                             NULL);
     325              : 
     326           16 :     DefineCustomStringVariable("worker_spi.database",
     327              :                                "Database to connect to.",
     328              :                                NULL,
     329              :                                &worker_spi_database,
     330              :                                "postgres",
     331              :                                PGC_SIGHUP,
     332              :                                0,
     333              :                                NULL, NULL, NULL);
     334              : 
     335           16 :     DefineCustomStringVariable("worker_spi.role",
     336              :                                "Role to connect with.",
     337              :                                NULL,
     338              :                                &worker_spi_role,
     339              :                                NULL,
     340              :                                PGC_SIGHUP,
     341              :                                0,
     342              :                                NULL, NULL, NULL);
     343              : 
     344           16 :     if (!process_shared_preload_libraries_in_progress)
     345           15 :         return;
     346              : 
     347            1 :     DefineCustomIntVariable("worker_spi.total_workers",
     348              :                             "Number of workers.",
     349              :                             NULL,
     350              :                             &worker_spi_total_workers,
     351              :                             2,
     352              :                             1,
     353              :                             100,
     354              :                             PGC_POSTMASTER,
     355              :                             0,
     356              :                             NULL,
     357              :                             NULL,
     358              :                             NULL);
     359              : 
     360            1 :     MarkGUCPrefixReserved("worker_spi");
     361              : 
     362              :     /* set up common data for all our workers */
     363            1 :     memset(&worker, 0, sizeof(worker));
     364            1 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     365              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     366            1 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     367            1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     368            1 :     sprintf(worker.bgw_library_name, "worker_spi");
     369            1 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     370            1 :     worker.bgw_notify_pid = 0;
     371              : 
     372              :     /*
     373              :      * Now fill in worker-specific data, and do the actual registrations.
     374              :      *
     375              :      * bgw_extra can optionally include a database OID, a role OID and a set
     376              :      * of flags.  This is left empty here to fallback to the related GUCs at
     377              :      * startup (0 for the bgworker flags).
     378              :      */
     379            4 :     for (int i = 1; i <= worker_spi_total_workers; i++)
     380              :     {
     381            3 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     382            3 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     383            3 :         worker.bgw_main_arg = Int32GetDatum(i);
     384              : 
     385            3 :         RegisterBackgroundWorker(&worker);
     386              :     }
     387              : }
     388              : 
     389              : /*
     390              :  * Dynamically launch an SPI worker.
     391              :  */
     392              : Datum
     393           12 : worker_spi_launch(PG_FUNCTION_ARGS)
     394              : {
     395           12 :     int32       i = PG_GETARG_INT32(0);
     396           12 :     Oid         dboid = PG_GETARG_OID(1);
     397           12 :     Oid         roleoid = PG_GETARG_OID(2);
     398              :     BackgroundWorker worker;
     399              :     BackgroundWorkerHandle *handle;
     400              :     BgwHandleStatus status;
     401              :     pid_t       pid;
     402              :     char       *p;
     403           12 :     bits32      flags = 0;
     404           12 :     ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
     405              :     Size        ndim;
     406              :     int         nelems;
     407              :     Datum      *datum_flags;
     408           12 :     bool        interruptible = PG_GETARG_BOOL(4);
     409              : 
     410           12 :     memset(&worker, 0, sizeof(worker));
     411           12 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     412              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     413              : 
     414           12 :     if (interruptible)
     415            4 :         worker.bgw_flags |= BGWORKER_INTERRUPTIBLE;
     416              : 
     417           12 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     418           12 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     419           12 :     sprintf(worker.bgw_library_name, "worker_spi");
     420           12 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     421           12 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
     422           12 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
     423           12 :     worker.bgw_main_arg = Int32GetDatum(i);
     424              :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     425           12 :     worker.bgw_notify_pid = MyProcPid;
     426              : 
     427              :     /* extract flags, if any */
     428           12 :     ndim = ARR_NDIM(arr);
     429           12 :     if (ndim > 1)
     430            0 :         ereport(ERROR,
     431              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     432              :                  errmsg("flags array must be one-dimensional")));
     433              : 
     434           12 :     if (array_contains_nulls(arr))
     435            0 :         ereport(ERROR,
     436              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     437              :                  errmsg("flags array must not contain nulls")));
     438              : 
     439              :     Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     440           12 :     deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
     441              : 
     442           14 :     for (i = 0; i < nelems; i++)
     443              :     {
     444            2 :         char       *optname = TextDatumGetCString(datum_flags[i]);
     445              : 
     446            2 :         if (strcmp(optname, "ALLOWCONN") == 0)
     447            1 :             flags |= BGWORKER_BYPASS_ALLOWCONN;
     448            1 :         else if (strcmp(optname, "ROLELOGINCHECK") == 0)
     449            1 :             flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
     450              :         else
     451            0 :             ereport(ERROR,
     452              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     453              :                      errmsg("incorrect flag value found in array")));
     454              :     }
     455              : 
     456              :     /*
     457              :      * Register database and role to use for the worker started in bgw_extra.
     458              :      * If none have been provided, this will fall back to the GUCs at startup.
     459              :      */
     460           12 :     if (!OidIsValid(dboid))
     461            1 :         dboid = get_database_oid(worker_spi_database, false);
     462              : 
     463              :     /*
     464              :      * worker_spi_role is NULL by default, so this gives to worker_spi_main()
     465              :      * an invalid OID in this case.
     466              :      */
     467           12 :     if (!OidIsValid(roleoid) && worker_spi_role)
     468            0 :         roleoid = get_role_oid(worker_spi_role, false);
     469              : 
     470           12 :     p = worker.bgw_extra;
     471           12 :     memcpy(p, &dboid, sizeof(Oid));
     472           12 :     p += sizeof(Oid);
     473           12 :     memcpy(p, &roleoid, sizeof(Oid));
     474           12 :     p += sizeof(Oid);
     475           12 :     memcpy(p, &flags, sizeof(bits32));
     476              : 
     477           12 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     478            0 :         PG_RETURN_NULL();
     479              : 
     480           12 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     481              : 
     482           12 :     if (status == BGWH_STOPPED)
     483            0 :         ereport(ERROR,
     484              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     485              :                  errmsg("could not start background process"),
     486              :                  errhint("More details may be available in the server log.")));
     487           12 :     if (status == BGWH_POSTMASTER_DIED)
     488            0 :         ereport(ERROR,
     489              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     490              :                  errmsg("cannot start background processes without postmaster"),
     491              :                  errhint("Kill all remaining database processes and restart the database.")));
     492              :     Assert(status == BGWH_STARTED);
     493              : 
     494           12 :     PG_RETURN_INT32(pid);
     495              : }
        

Generated by: LCOV version 2.0-1