LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 144 157 91.7 %
Date: 2025-01-18 04:15:08 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * worker_spi.c
       4             :  *      Sample background worker code that demonstrates various coding
       5             :  *      patterns: establishing a database connection; starting and committing
       6             :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7             :  *      the configuration file; reporting to pg_stat_activity; using the
       8             :  *      process latch to sleep and exit in case of postmaster death.
       9             :  *
      10             :  * This code connects to a database, creates a schema and table, and summarizes
      11             :  * the numbers contained therein.  To see it working, insert an initial value
      12             :  * with "total" type and some initial value; then insert some other rows with
      13             :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14             :  * aggregated into the total.
      15             :  *
      16             :  * Copyright (c) 2013-2025, PostgreSQL Global Development Group
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *      src/test/modules/worker_spi/worker_spi.c
      20             :  *
      21             :  * -------------------------------------------------------------------------
      22             :  */
      23             : #include "postgres.h"
      24             : 
      25             : /* These are always necessary for a bgworker */
      26             : #include "miscadmin.h"
      27             : #include "postmaster/bgworker.h"
      28             : #include "postmaster/interrupt.h"
      29             : #include "storage/latch.h"
      30             : 
      31             : /* these headers are used by this particular worker's code */
      32             : #include "access/xact.h"
      33             : #include "commands/dbcommands.h"
      34             : #include "executor/spi.h"
      35             : #include "fmgr.h"
      36             : #include "lib/stringinfo.h"
      37             : #include "pgstat.h"
      38             : #include "tcop/utility.h"
      39             : #include "utils/acl.h"
      40             : #include "utils/builtins.h"
      41             : #include "utils/snapmgr.h"
      42             : 
      43          10 : PG_MODULE_MAGIC;
      44             : 
      45          18 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      46             : 
      47             : PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
      48             : 
      49             : /* GUC variables */
      50             : static int  worker_spi_naptime = 10;
      51             : static int  worker_spi_total_workers = 2;
      52             : static char *worker_spi_database = NULL;
      53             : static char *worker_spi_role = NULL;
      54             : 
      55             : /* value cached, fetched from shared memory */
      56             : static uint32 worker_spi_wait_event_main = 0;
      57             : 
      58             : typedef struct worktable
      59             : {
      60             :     const char *schema;
      61             :     const char *name;
      62             : } worktable;
      63             : 
      64             : /*
      65             :  * Initialize workspace for a worker process: create the schema if it doesn't
      66             :  * already exist.
      67             :  */
      68             : static void
      69           2 : initialize_worker_spi(worktable *table)
      70             : {
      71             :     int         ret;
      72             :     int         ntup;
      73             :     bool        isnull;
      74             :     StringInfoData buf;
      75             : 
      76           2 :     SetCurrentStatementStartTimestamp();
      77           2 :     StartTransactionCommand();
      78           2 :     SPI_connect();
      79           2 :     PushActiveSnapshot(GetTransactionSnapshot());
      80           2 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      81             : 
      82             :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      83           2 :     initStringInfo(&buf);
      84           2 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      85             :                      table->schema);
      86             : 
      87           2 :     debug_query_string = buf.data;
      88           2 :     ret = SPI_execute(buf.data, true, 0);
      89           2 :     if (ret != SPI_OK_SELECT)
      90           0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
      91             : 
      92           2 :     if (SPI_processed != 1)
      93           0 :         elog(FATAL, "not a singleton result");
      94             : 
      95           2 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
      96           2 :                                        SPI_tuptable->tupdesc,
      97             :                                        1, &isnull));
      98           2 :     if (isnull)
      99           0 :         elog(FATAL, "null result");
     100             : 
     101           2 :     if (ntup == 0)
     102             :     {
     103           2 :         debug_query_string = NULL;
     104           2 :         resetStringInfo(&buf);
     105           2 :         appendStringInfo(&buf,
     106             :                          "CREATE SCHEMA \"%s\" "
     107             :                          "CREATE TABLE \"%s\" ("
     108             :                          "     type text CHECK (type IN ('total', 'delta')), "
     109             :                          "     value   integer)"
     110             :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     111             :                          "WHERE type = 'total'",
     112             :                          table->schema, table->name, table->name, table->name);
     113             : 
     114             :         /* set statement start time */
     115           2 :         SetCurrentStatementStartTimestamp();
     116             : 
     117           2 :         debug_query_string = buf.data;
     118           2 :         ret = SPI_execute(buf.data, false, 0);
     119             : 
     120           2 :         if (ret != SPI_OK_UTILITY)
     121           0 :             elog(FATAL, "failed to create my schema");
     122             : 
     123           2 :         debug_query_string = NULL;  /* rest is not statement-specific */
     124             :     }
     125             : 
     126           2 :     SPI_finish();
     127           2 :     PopActiveSnapshot();
     128           2 :     CommitTransactionCommand();
     129           2 :     debug_query_string = NULL;
     130           2 :     pgstat_report_activity(STATE_IDLE, NULL);
     131           2 : }
     132             : 
     133             : void
     134           6 : worker_spi_main(Datum main_arg)
     135             : {
     136           6 :     int         index = DatumGetInt32(main_arg);
     137             :     worktable  *table;
     138             :     StringInfoData buf;
     139             :     char        name[20];
     140             :     Oid         dboid;
     141             :     Oid         roleoid;
     142             :     char       *p;
     143           6 :     bits32      flags = 0;
     144             : 
     145           6 :     table = palloc(sizeof(worktable));
     146           6 :     sprintf(name, "schema%d", index);
     147           6 :     table->schema = pstrdup(name);
     148           6 :     table->name = pstrdup("counted");
     149             : 
     150             :     /* fetch database and role OIDs, these are set for a dynamic worker */
     151           6 :     p = MyBgworkerEntry->bgw_extra;
     152           6 :     memcpy(&dboid, p, sizeof(Oid));
     153           6 :     p += sizeof(Oid);
     154           6 :     memcpy(&roleoid, p, sizeof(Oid));
     155           6 :     p += sizeof(Oid);
     156           6 :     memcpy(&flags, p, sizeof(bits32));
     157             : 
     158             :     /* Establish signal handlers before unblocking signals. */
     159           6 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     160           6 :     pqsignal(SIGTERM, die);
     161             : 
     162             :     /* We're now ready to receive signals */
     163           6 :     BackgroundWorkerUnblockSignals();
     164             : 
     165             :     /* Connect to our database */
     166           6 :     if (OidIsValid(dboid))
     167           6 :         BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
     168             :     else
     169           0 :         BackgroundWorkerInitializeConnection(worker_spi_database,
     170             :                                              worker_spi_role, flags);
     171             : 
     172           2 :     elog(LOG, "%s initialized with %s.%s",
     173             :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     174           2 :     initialize_worker_spi(table);
     175             : 
     176             :     /*
     177             :      * Quote identifiers passed to us.  Note that this must be done after
     178             :      * initialize_worker_spi, because that routine assumes the names are not
     179             :      * quoted.
     180             :      *
     181             :      * Note some memory might be leaked here.
     182             :      */
     183           2 :     table->schema = quote_identifier(table->schema);
     184           2 :     table->name = quote_identifier(table->name);
     185             : 
     186           2 :     initStringInfo(&buf);
     187           2 :     appendStringInfo(&buf,
     188             :                      "WITH deleted AS (DELETE "
     189             :                      "FROM %s.%s "
     190             :                      "WHERE type = 'delta' RETURNING value), "
     191             :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     192             :                      "FROM deleted) "
     193             :                      "UPDATE %s.%s "
     194             :                      "SET value = %s.value + total.sum "
     195             :                      "FROM total WHERE type = 'total' "
     196             :                      "RETURNING %s.value",
     197             :                      table->schema, table->name,
     198             :                      table->schema, table->name,
     199             :                      table->name,
     200             :                      table->name);
     201             : 
     202             :     /*
     203             :      * Main loop: do this until SIGTERM is received and processed by
     204             :      * ProcessInterrupts.
     205             :      */
     206             :     for (;;)
     207           4 :     {
     208             :         int         ret;
     209             : 
     210             :         /* First time, allocate or get the custom wait event */
     211           6 :         if (worker_spi_wait_event_main == 0)
     212           2 :             worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
     213             : 
     214             :         /*
     215             :          * Background workers mustn't call usleep() or any direct equivalent:
     216             :          * instead, they may wait on their process latch, which sleeps as
     217             :          * necessary, but is awakened if postmaster dies.  That way the
     218             :          * background process goes away immediately in an emergency.
     219             :          */
     220           6 :         (void) WaitLatch(MyLatch,
     221             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     222             :                          worker_spi_naptime * 1000L,
     223             :                          worker_spi_wait_event_main);
     224           6 :         ResetLatch(MyLatch);
     225             : 
     226           6 :         CHECK_FOR_INTERRUPTS();
     227             : 
     228             :         /*
     229             :          * In case of a SIGHUP, just reload the configuration.
     230             :          */
     231           4 :         if (ConfigReloadPending)
     232             :         {
     233           2 :             ConfigReloadPending = false;
     234           2 :             ProcessConfigFile(PGC_SIGHUP);
     235             :         }
     236             : 
     237             :         /*
     238             :          * Start a transaction on which we can run queries.  Note that each
     239             :          * StartTransactionCommand() call should be preceded by a
     240             :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     241             :          * for the statement we're about the run, and also the transaction
     242             :          * start time.  Also, each other query sent to SPI should probably be
     243             :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     244             :          * start time is always up to date.
     245             :          *
     246             :          * The SPI_connect() call lets us run queries through the SPI manager,
     247             :          * and the PushActiveSnapshot() call creates an "active" snapshot
     248             :          * which is necessary for queries to have MVCC data to work on.
     249             :          *
     250             :          * The pgstat_report_activity() call makes our activity visible
     251             :          * through the pgstat views.
     252             :          */
     253           4 :         SetCurrentStatementStartTimestamp();
     254           4 :         StartTransactionCommand();
     255           4 :         SPI_connect();
     256           4 :         PushActiveSnapshot(GetTransactionSnapshot());
     257           4 :         debug_query_string = buf.data;
     258           4 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     259             : 
     260             :         /* We can now execute queries via SPI */
     261           4 :         ret = SPI_execute(buf.data, false, 0);
     262             : 
     263           4 :         if (ret != SPI_OK_UPDATE_RETURNING)
     264           0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     265             :                  table->schema, table->name, ret);
     266             : 
     267           4 :         if (SPI_processed > 0)
     268             :         {
     269             :             bool        isnull;
     270             :             int32       val;
     271             : 
     272           2 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     273           2 :                                               SPI_tuptable->tupdesc,
     274             :                                               1, &isnull));
     275           2 :             if (!isnull)
     276           2 :                 elog(LOG, "%s: count in %s.%s is now %d",
     277             :                      MyBgworkerEntry->bgw_name,
     278             :                      table->schema, table->name, val);
     279             :         }
     280             : 
     281             :         /*
     282             :          * And finish our transaction.
     283             :          */
     284           4 :         SPI_finish();
     285           4 :         PopActiveSnapshot();
     286           4 :         CommitTransactionCommand();
     287           4 :         debug_query_string = NULL;
     288           4 :         pgstat_report_stat(true);
     289           4 :         pgstat_report_activity(STATE_IDLE, NULL);
     290             :     }
     291             : 
     292             :     /* Not reachable */
     293             : }
     294             : 
     295             : /*
     296             :  * Entrypoint of this module.
     297             :  *
     298             :  * We register more than one worker process here, to demonstrate how that can
     299             :  * be done.
     300             :  */
     301             : void
     302          10 : _PG_init(void)
     303             : {
     304             :     BackgroundWorker worker;
     305             : 
     306             :     /* get the configuration */
     307             : 
     308             :     /*
     309             :      * These GUCs are defined even if this library is not loaded with
     310             :      * shared_preload_libraries, for worker_spi_launch().
     311             :      */
     312          10 :     DefineCustomIntVariable("worker_spi.naptime",
     313             :                             "Duration between each check (in seconds).",
     314             :                             NULL,
     315             :                             &worker_spi_naptime,
     316             :                             10,
     317             :                             1,
     318             :                             INT_MAX,
     319             :                             PGC_SIGHUP,
     320             :                             0,
     321             :                             NULL,
     322             :                             NULL,
     323             :                             NULL);
     324             : 
     325          10 :     DefineCustomStringVariable("worker_spi.database",
     326             :                                "Database to connect to.",
     327             :                                NULL,
     328             :                                &worker_spi_database,
     329             :                                "postgres",
     330             :                                PGC_SIGHUP,
     331             :                                0,
     332             :                                NULL, NULL, NULL);
     333             : 
     334          10 :     DefineCustomStringVariable("worker_spi.role",
     335             :                                "Role to connect with.",
     336             :                                NULL,
     337             :                                &worker_spi_role,
     338             :                                NULL,
     339             :                                PGC_SIGHUP,
     340             :                                0,
     341             :                                NULL, NULL, NULL);
     342             : 
     343          10 :     if (!process_shared_preload_libraries_in_progress)
     344           8 :         return;
     345             : 
     346           2 :     DefineCustomIntVariable("worker_spi.total_workers",
     347             :                             "Number of workers.",
     348             :                             NULL,
     349             :                             &worker_spi_total_workers,
     350             :                             2,
     351             :                             1,
     352             :                             100,
     353             :                             PGC_POSTMASTER,
     354             :                             0,
     355             :                             NULL,
     356             :                             NULL,
     357             :                             NULL);
     358             : 
     359           2 :     MarkGUCPrefixReserved("worker_spi");
     360             : 
     361             :     /* set up common data for all our workers */
     362           2 :     memset(&worker, 0, sizeof(worker));
     363           2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     364             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     365           2 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     366           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     367           2 :     sprintf(worker.bgw_library_name, "worker_spi");
     368           2 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     369           2 :     worker.bgw_notify_pid = 0;
     370             : 
     371             :     /*
     372             :      * Now fill in worker-specific data, and do the actual registrations.
     373             :      *
     374             :      * bgw_extra can optionally include a database OID, a role OID and a set
     375             :      * of flags.  This is left empty here to fallback to the related GUCs at
     376             :      * startup (0 for the bgworker flags).
     377             :      */
     378           8 :     for (int i = 1; i <= worker_spi_total_workers; i++)
     379             :     {
     380           6 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     381           6 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     382           6 :         worker.bgw_main_arg = Int32GetDatum(i);
     383             : 
     384           6 :         RegisterBackgroundWorker(&worker);
     385             :     }
     386             : }
     387             : 
     388             : /*
     389             :  * Dynamically launch an SPI worker.
     390             :  */
     391             : Datum
     392          14 : worker_spi_launch(PG_FUNCTION_ARGS)
     393             : {
     394          14 :     int32       i = PG_GETARG_INT32(0);
     395          14 :     Oid         dboid = PG_GETARG_OID(1);
     396          14 :     Oid         roleoid = PG_GETARG_OID(2);
     397             :     BackgroundWorker worker;
     398             :     BackgroundWorkerHandle *handle;
     399             :     BgwHandleStatus status;
     400             :     pid_t       pid;
     401             :     char       *p;
     402          14 :     bits32      flags = 0;
     403          14 :     ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
     404             :     Size        ndim;
     405             :     int         nelems;
     406             :     Datum      *datum_flags;
     407             : 
     408          14 :     memset(&worker, 0, sizeof(worker));
     409          14 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     410             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     411          14 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     412          14 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     413          14 :     sprintf(worker.bgw_library_name, "worker_spi");
     414          14 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     415          14 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
     416          14 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
     417          14 :     worker.bgw_main_arg = Int32GetDatum(i);
     418             :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     419          14 :     worker.bgw_notify_pid = MyProcPid;
     420             : 
     421             :     /* extract flags, if any */
     422          14 :     ndim = ARR_NDIM(arr);
     423          14 :     if (ndim > 1)
     424           0 :         ereport(ERROR,
     425             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     426             :                  errmsg("flags array must be one-dimensional")));
     427             : 
     428          14 :     if (array_contains_nulls(arr))
     429           0 :         ereport(ERROR,
     430             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     431             :                  errmsg("flags array must not contain nulls")));
     432             : 
     433             :     Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     434          14 :     deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
     435             : 
     436          18 :     for (i = 0; i < nelems; i++)
     437             :     {
     438           4 :         char       *optname = TextDatumGetCString(datum_flags[i]);
     439             : 
     440           4 :         if (strcmp(optname, "ALLOWCONN") == 0)
     441           2 :             flags |= BGWORKER_BYPASS_ALLOWCONN;
     442           2 :         else if (strcmp(optname, "ROLELOGINCHECK") == 0)
     443           2 :             flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
     444             :         else
     445           0 :             ereport(ERROR,
     446             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     447             :                      errmsg("incorrect flag value found in array")));
     448             :     }
     449             : 
     450             :     /*
     451             :      * Register database and role to use for the worker started in bgw_extra.
     452             :      * If none have been provided, this will fall back to the GUCs at startup.
     453             :      */
     454          14 :     if (!OidIsValid(dboid))
     455           2 :         dboid = get_database_oid(worker_spi_database, false);
     456             : 
     457             :     /*
     458             :      * worker_spi_role is NULL by default, so this gives to worker_spi_main()
     459             :      * an invalid OID in this case.
     460             :      */
     461          14 :     if (!OidIsValid(roleoid) && worker_spi_role)
     462           0 :         roleoid = get_role_oid(worker_spi_role, false);
     463             : 
     464          14 :     p = worker.bgw_extra;
     465          14 :     memcpy(p, &dboid, sizeof(Oid));
     466          14 :     p += sizeof(Oid);
     467          14 :     memcpy(p, &roleoid, sizeof(Oid));
     468          14 :     p += sizeof(Oid);
     469          14 :     memcpy(p, &flags, sizeof(bits32));
     470             : 
     471          14 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     472           0 :         PG_RETURN_NULL();
     473             : 
     474          14 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     475             : 
     476          14 :     if (status == BGWH_STOPPED)
     477           0 :         ereport(ERROR,
     478             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     479             :                  errmsg("could not start background process"),
     480             :                  errhint("More details may be available in the server log.")));
     481          14 :     if (status == BGWH_POSTMASTER_DIED)
     482           0 :         ereport(ERROR,
     483             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     484             :                  errmsg("cannot start background processes without postmaster"),
     485             :                  errhint("Kill all remaining database processes and restart the database.")));
     486             :     Assert(status == BGWH_STARTED);
     487             : 
     488          14 :     PG_RETURN_INT32(pid);
     489             : }

Generated by: LCOV version 1.14