LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 90.6 % 160 145
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 6 6
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * worker_spi.c
       4              :  *      Sample background worker code that demonstrates various coding
       5              :  *      patterns: establishing a database connection; starting and committing
       6              :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7              :  *      the configuration file; reporting to pg_stat_activity; using the
       8              :  *      process latch to sleep and exit in case of postmaster death.
       9              :  *
      10              :  * This code connects to a database, creates a schema and table, and summarizes
      11              :  * the numbers contained therein.  To see it working, insert an initial value
      12              :  * with "total" type and some initial value; then insert some other rows with
      13              :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14              :  * aggregated into the total.
      15              :  *
      16              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
      17              :  *
      18              :  * IDENTIFICATION
      19              :  *      src/test/modules/worker_spi/worker_spi.c
      20              :  *
      21              :  * -------------------------------------------------------------------------
      22              :  */
      23              : #include "postgres.h"
      24              : 
      25              : /* These are always necessary for a bgworker */
      26              : #include "miscadmin.h"
      27              : #include "postmaster/bgworker.h"
      28              : #include "postmaster/interrupt.h"
      29              : #include "storage/latch.h"
      30              : 
      31              : /* these headers are used by this particular worker's code */
      32              : #include "access/xact.h"
      33              : #include "catalog/pg_database.h"
      34              : #include "executor/spi.h"
      35              : #include "fmgr.h"
      36              : #include "lib/stringinfo.h"
      37              : #include "pgstat.h"
      38              : #include "tcop/utility.h"
      39              : #include "utils/acl.h"
      40              : #include "utils/builtins.h"
      41              : #include "utils/snapmgr.h"
      42              : 
      43           15 : PG_MODULE_MAGIC;
      44              : 
      45           13 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      46              : 
      47              : PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg);
      48              : 
      49              : /* GUC variables */
      50              : static int  worker_spi_naptime = 10;
      51              : static int  worker_spi_total_workers = 2;
      52              : static char *worker_spi_database = NULL;
      53              : static char *worker_spi_role = NULL;
      54              : 
      55              : /* value cached, fetched from shared memory */
      56              : static uint32 worker_spi_wait_event_main = 0;
      57              : 
      58              : typedef struct worktable
      59              : {
      60              :     const char *schema;
      61              :     const char *name;
      62              : } worktable;
      63              : 
      64              : /*
      65              :  * Initialize workspace for a worker process: create the schema if it doesn't
      66              :  * already exist.
      67              :  */
      68              : static void
      69            5 : initialize_worker_spi(worktable *table)
      70              : {
      71              :     int         ret;
      72              :     int         ntup;
      73              :     bool        isnull;
      74              :     StringInfoData buf;
      75              : 
      76            5 :     SetCurrentStatementStartTimestamp();
      77            5 :     StartTransactionCommand();
      78            5 :     SPI_connect();
      79            5 :     PushActiveSnapshot(GetTransactionSnapshot());
      80            5 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      81              : 
      82              :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      83            5 :     initStringInfo(&buf);
      84            5 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      85              :                      table->schema);
      86              : 
      87            5 :     debug_query_string = buf.data;
      88            5 :     ret = SPI_execute(buf.data, true, 0);
      89            5 :     if (ret != SPI_OK_SELECT)
      90            0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
      91              : 
      92            5 :     if (SPI_processed != 1)
      93            0 :         elog(FATAL, "not a singleton result");
      94              : 
      95            5 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
      96            5 :                                        SPI_tuptable->tupdesc,
      97              :                                        1, &isnull));
      98            5 :     if (isnull)
      99            0 :         elog(FATAL, "null result");
     100              : 
     101            5 :     if (ntup == 0)
     102              :     {
     103            5 :         debug_query_string = NULL;
     104            5 :         resetStringInfo(&buf);
     105            5 :         appendStringInfo(&buf,
     106              :                          "CREATE SCHEMA \"%s\" "
     107              :                          "CREATE TABLE \"%s\" ("
     108              :                          "     type text CHECK (type IN ('total', 'delta')), "
     109              :                          "     value   integer)"
     110              :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     111              :                          "WHERE type = 'total'",
     112              :                          table->schema, table->name, table->name, table->name);
     113              : 
     114              :         /* set statement start time */
     115            5 :         SetCurrentStatementStartTimestamp();
     116              : 
     117            5 :         debug_query_string = buf.data;
     118            5 :         ret = SPI_execute(buf.data, false, 0);
     119              : 
     120            5 :         if (ret != SPI_OK_UTILITY)
     121            0 :             elog(FATAL, "failed to create my schema");
     122              : 
     123            5 :         debug_query_string = NULL;  /* rest is not statement-specific */
     124              :     }
     125              : 
     126            5 :     SPI_finish();
     127            5 :     PopActiveSnapshot();
     128            5 :     CommitTransactionCommand();
     129            5 :     debug_query_string = NULL;
     130            5 :     pgstat_report_activity(STATE_IDLE, NULL);
     131            5 : }
     132              : 
     133              : void
     134            6 : worker_spi_main(Datum main_arg)
     135              : {
     136            6 :     int         index = DatumGetInt32(main_arg);
     137              :     worktable  *table;
     138              :     StringInfoData buf;
     139              :     char        name[20];
     140              :     Oid         dboid;
     141              :     Oid         roleoid;
     142              :     char       *p;
     143            6 :     bits32      flags = 0;
     144              : 
     145            6 :     table = palloc_object(worktable);
     146            6 :     sprintf(name, "schema%d", index);
     147            6 :     table->schema = pstrdup(name);
     148            6 :     table->name = pstrdup("counted");
     149              : 
     150              :     /* fetch database and role OIDs, these are set for a dynamic worker */
     151            6 :     p = MyBgworkerEntry->bgw_extra;
     152            6 :     memcpy(&dboid, p, sizeof(Oid));
     153            6 :     p += sizeof(Oid);
     154            6 :     memcpy(&roleoid, p, sizeof(Oid));
     155            6 :     p += sizeof(Oid);
     156            6 :     memcpy(&flags, p, sizeof(bits32));
     157              : 
     158              :     /* Establish signal handlers before unblocking signals. */
     159            6 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     160            6 :     pqsignal(SIGTERM, die);
     161              : 
     162              :     /* We're now ready to receive signals */
     163            6 :     BackgroundWorkerUnblockSignals();
     164              : 
     165              :     /* Connect to our database */
     166            6 :     if (OidIsValid(dboid))
     167            6 :         BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
     168              :     else
     169            0 :         BackgroundWorkerInitializeConnection(worker_spi_database,
     170              :                                              worker_spi_role, flags);
     171              : 
     172            5 :     elog(LOG, "%s initialized with %s.%s",
     173              :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     174            5 :     initialize_worker_spi(table);
     175              : 
     176              :     /*
     177              :      * Quote identifiers passed to us.  Note that this must be done after
     178              :      * initialize_worker_spi, because that routine assumes the names are not
     179              :      * quoted.
     180              :      *
     181              :      * Note some memory might be leaked here.
     182              :      */
     183            5 :     table->schema = quote_identifier(table->schema);
     184            5 :     table->name = quote_identifier(table->name);
     185              : 
     186            5 :     initStringInfo(&buf);
     187            5 :     appendStringInfo(&buf,
     188              :                      "WITH deleted AS (DELETE "
     189              :                      "FROM %s.%s "
     190              :                      "WHERE type = 'delta' RETURNING value), "
     191              :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     192              :                      "FROM deleted) "
     193              :                      "UPDATE %s.%s "
     194              :                      "SET value = %s.value + total.sum "
     195              :                      "FROM total WHERE type = 'total' "
     196              :                      "RETURNING %s.value",
     197              :                      table->schema, table->name,
     198              :                      table->schema, table->name,
     199              :                      table->name,
     200              :                      table->name);
     201              : 
     202              :     /*
     203              :      * Main loop: do this until SIGTERM is received and processed by
     204              :      * ProcessInterrupts.
     205              :      */
     206              :     for (;;)
     207            6 :     {
     208              :         int         ret;
     209              : 
     210              :         /* First time, allocate or get the custom wait event */
     211           11 :         if (worker_spi_wait_event_main == 0)
     212            5 :             worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
     213              : 
     214              :         /*
     215              :          * Background workers mustn't call usleep() or any direct equivalent:
     216              :          * instead, they may wait on their process latch, which sleeps as
     217              :          * necessary, but is awakened if postmaster dies.  That way the
     218              :          * background process goes away immediately in an emergency.
     219              :          */
     220           11 :         (void) WaitLatch(MyLatch,
     221              :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     222              :                          worker_spi_naptime * 1000L,
     223              :                          worker_spi_wait_event_main);
     224           11 :         ResetLatch(MyLatch);
     225              : 
     226           11 :         CHECK_FOR_INTERRUPTS();
     227              : 
     228              :         /*
     229              :          * In case of a SIGHUP, just reload the configuration.
     230              :          */
     231            6 :         if (ConfigReloadPending)
     232              :         {
     233            1 :             ConfigReloadPending = false;
     234            1 :             ProcessConfigFile(PGC_SIGHUP);
     235              :         }
     236              : 
     237              :         /*
     238              :          * Start a transaction on which we can run queries.  Note that each
     239              :          * StartTransactionCommand() call should be preceded by a
     240              :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     241              :          * for the statement we're about the run, and also the transaction
     242              :          * start time.  Also, each other query sent to SPI should probably be
     243              :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     244              :          * start time is always up to date.
     245              :          *
     246              :          * The SPI_connect() call lets us run queries through the SPI manager,
     247              :          * and the PushActiveSnapshot() call creates an "active" snapshot
     248              :          * which is necessary for queries to have MVCC data to work on.
     249              :          *
     250              :          * The pgstat_report_activity() call makes our activity visible
     251              :          * through the pgstat views.
     252              :          */
     253            6 :         SetCurrentStatementStartTimestamp();
     254            6 :         StartTransactionCommand();
     255            6 :         SPI_connect();
     256            6 :         PushActiveSnapshot(GetTransactionSnapshot());
     257            6 :         debug_query_string = buf.data;
     258            6 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     259              : 
     260              :         /* We can now execute queries via SPI */
     261            6 :         ret = SPI_execute(buf.data, false, 0);
     262              : 
     263            6 :         if (ret != SPI_OK_UPDATE_RETURNING)
     264            0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     265              :                  table->schema, table->name, ret);
     266              : 
     267            6 :         if (SPI_processed > 0)
     268              :         {
     269              :             bool        isnull;
     270              :             int32       val;
     271              : 
     272            1 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     273            1 :                                               SPI_tuptable->tupdesc,
     274              :                                               1, &isnull));
     275            1 :             if (!isnull)
     276            1 :                 elog(LOG, "%s: count in %s.%s is now %d",
     277              :                      MyBgworkerEntry->bgw_name,
     278              :                      table->schema, table->name, val);
     279              :         }
     280              : 
     281              :         /*
     282              :          * And finish our transaction.
     283              :          */
     284            6 :         SPI_finish();
     285            6 :         PopActiveSnapshot();
     286            6 :         CommitTransactionCommand();
     287            6 :         debug_query_string = NULL;
     288            6 :         pgstat_report_stat(true);
     289            6 :         pgstat_report_activity(STATE_IDLE, NULL);
     290              :     }
     291              : 
     292              :     /* Not reachable */
     293              : }
     294              : 
     295              : /*
     296              :  * Entrypoint of this module.
     297              :  *
     298              :  * We register more than one worker process here, to demonstrate how that can
     299              :  * be done.
     300              :  */
     301              : void
     302           15 : _PG_init(void)
     303              : {
     304              :     BackgroundWorker worker;
     305              : 
     306              :     /* get the configuration */
     307              : 
     308              :     /*
     309              :      * These GUCs are defined even if this library is not loaded with
     310              :      * shared_preload_libraries, for worker_spi_launch().
     311              :      */
     312           15 :     DefineCustomIntVariable("worker_spi.naptime",
     313              :                             "Duration between each check (in seconds).",
     314              :                             NULL,
     315              :                             &worker_spi_naptime,
     316              :                             10,
     317              :                             1,
     318              :                             INT_MAX,
     319              :                             PGC_SIGHUP,
     320              :                             0,
     321              :                             NULL,
     322              :                             NULL,
     323              :                             NULL);
     324              : 
     325           15 :     DefineCustomStringVariable("worker_spi.database",
     326              :                                "Database to connect to.",
     327              :                                NULL,
     328              :                                &worker_spi_database,
     329              :                                "postgres",
     330              :                                PGC_SIGHUP,
     331              :                                0,
     332              :                                NULL, NULL, NULL);
     333              : 
     334           15 :     DefineCustomStringVariable("worker_spi.role",
     335              :                                "Role to connect with.",
     336              :                                NULL,
     337              :                                &worker_spi_role,
     338              :                                NULL,
     339              :                                PGC_SIGHUP,
     340              :                                0,
     341              :                                NULL, NULL, NULL);
     342              : 
     343           15 :     if (!process_shared_preload_libraries_in_progress)
     344           14 :         return;
     345              : 
     346            1 :     DefineCustomIntVariable("worker_spi.total_workers",
     347              :                             "Number of workers.",
     348              :                             NULL,
     349              :                             &worker_spi_total_workers,
     350              :                             2,
     351              :                             1,
     352              :                             100,
     353              :                             PGC_POSTMASTER,
     354              :                             0,
     355              :                             NULL,
     356              :                             NULL,
     357              :                             NULL);
     358              : 
     359            1 :     MarkGUCPrefixReserved("worker_spi");
     360              : 
     361              :     /* set up common data for all our workers */
     362            1 :     memset(&worker, 0, sizeof(worker));
     363            1 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     364              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     365            1 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     366            1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     367            1 :     sprintf(worker.bgw_library_name, "worker_spi");
     368            1 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     369            1 :     worker.bgw_notify_pid = 0;
     370              : 
     371              :     /*
     372              :      * Now fill in worker-specific data, and do the actual registrations.
     373              :      *
     374              :      * bgw_extra can optionally include a database OID, a role OID and a set
     375              :      * of flags.  This is left empty here to fallback to the related GUCs at
     376              :      * startup (0 for the bgworker flags).
     377              :      */
     378            4 :     for (int i = 1; i <= worker_spi_total_workers; i++)
     379              :     {
     380            3 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     381            3 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     382            3 :         worker.bgw_main_arg = Int32GetDatum(i);
     383              : 
     384            3 :         RegisterBackgroundWorker(&worker);
     385              :     }
     386              : }
     387              : 
     388              : /*
     389              :  * Dynamically launch an SPI worker.
     390              :  */
     391              : Datum
     392           10 : worker_spi_launch(PG_FUNCTION_ARGS)
     393              : {
     394           10 :     int32       i = PG_GETARG_INT32(0);
     395           10 :     Oid         dboid = PG_GETARG_OID(1);
     396           10 :     Oid         roleoid = PG_GETARG_OID(2);
     397              :     BackgroundWorker worker;
     398              :     BackgroundWorkerHandle *handle;
     399              :     BgwHandleStatus status;
     400              :     pid_t       pid;
     401              :     char       *p;
     402           10 :     bits32      flags = 0;
     403           10 :     ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
     404              :     Size        ndim;
     405              :     int         nelems;
     406              :     Datum      *datum_flags;
     407           10 :     bool        interruptible = PG_GETARG_BOOL(4);
     408              : 
     409           10 :     memset(&worker, 0, sizeof(worker));
     410           10 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     411              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     412              : 
     413           10 :     if (interruptible)
     414            4 :         worker.bgw_flags |= BGWORKER_INTERRUPTIBLE;
     415              : 
     416           10 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     417           10 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     418           10 :     sprintf(worker.bgw_library_name, "worker_spi");
     419           10 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     420           10 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
     421           10 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
     422           10 :     worker.bgw_main_arg = Int32GetDatum(i);
     423              :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     424           10 :     worker.bgw_notify_pid = MyProcPid;
     425              : 
     426              :     /* extract flags, if any */
     427           10 :     ndim = ARR_NDIM(arr);
     428           10 :     if (ndim > 1)
     429            0 :         ereport(ERROR,
     430              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     431              :                  errmsg("flags array must be one-dimensional")));
     432              : 
     433           10 :     if (array_contains_nulls(arr))
     434            0 :         ereport(ERROR,
     435              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     436              :                  errmsg("flags array must not contain nulls")));
     437              : 
     438              :     Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     439           10 :     deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
     440              : 
     441           11 :     for (i = 0; i < nelems; i++)
     442              :     {
     443            1 :         char       *optname = TextDatumGetCString(datum_flags[i]);
     444              : 
     445            1 :         if (strcmp(optname, "ALLOWCONN") == 0)
     446            1 :             flags |= BGWORKER_BYPASS_ALLOWCONN;
     447            0 :         else if (strcmp(optname, "ROLELOGINCHECK") == 0)
     448            0 :             flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
     449              :         else
     450            0 :             ereport(ERROR,
     451              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     452              :                      errmsg("incorrect flag value found in array")));
     453              :     }
     454              : 
     455              :     /*
     456              :      * Register database and role to use for the worker started in bgw_extra.
     457              :      * If none have been provided, this will fall back to the GUCs at startup.
     458              :      */
     459           10 :     if (!OidIsValid(dboid))
     460            1 :         dboid = get_database_oid(worker_spi_database, false);
     461              : 
     462              :     /*
     463              :      * worker_spi_role is NULL by default, so this gives to worker_spi_main()
     464              :      * an invalid OID in this case.
     465              :      */
     466           10 :     if (!OidIsValid(roleoid) && worker_spi_role)
     467            0 :         roleoid = get_role_oid(worker_spi_role, false);
     468              : 
     469           10 :     p = worker.bgw_extra;
     470           10 :     memcpy(p, &dboid, sizeof(Oid));
     471           10 :     p += sizeof(Oid);
     472           10 :     memcpy(p, &roleoid, sizeof(Oid));
     473           10 :     p += sizeof(Oid);
     474           10 :     memcpy(p, &flags, sizeof(bits32));
     475              : 
     476           10 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     477            0 :         PG_RETURN_NULL();
     478              : 
     479           10 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     480              : 
     481           10 :     if (status == BGWH_STOPPED)
     482            0 :         ereport(ERROR,
     483              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     484              :                  errmsg("could not start background process"),
     485              :                  errhint("More details may be available in the server log.")));
     486           10 :     if (status == BGWH_POSTMASTER_DIED)
     487            0 :         ereport(ERROR,
     488              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     489              :                  errmsg("cannot start background processes without postmaster"),
     490              :                  errhint("Kill all remaining database processes and restart the database.")));
     491              :     Assert(status == BGWH_STARTED);
     492              : 
     493           10 :     PG_RETURN_INT32(pid);
     494              : }
        

Generated by: LCOV version 2.0-1