LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 145 159 91.2 %
Date: 2024-09-16 13:12:04 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * worker_spi.c
       4             :  *      Sample background worker code that demonstrates various coding
       5             :  *      patterns: establishing a database connection; starting and committing
       6             :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7             :  *      the configuration file; reporting to pg_stat_activity; using the
       8             :  *      process latch to sleep and exit in case of postmaster death.
       9             :  *
      10             :  * This code connects to a database, creates a schema and table, and summarizes
      11             :  * the numbers contained therein.  To see it working, insert an initial value
      12             :  * with "total" type and some initial value; then insert some other rows with
      13             :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14             :  * aggregated into the total.
      15             :  *
      16             :  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *      src/test/modules/worker_spi/worker_spi.c
      20             :  *
      21             :  * -------------------------------------------------------------------------
      22             :  */
      23             : #include "postgres.h"
      24             : 
      25             : /* These are always necessary for a bgworker */
      26             : #include "miscadmin.h"
      27             : #include "postmaster/bgworker.h"
      28             : #include "postmaster/interrupt.h"
      29             : #include "storage/ipc.h"
      30             : #include "storage/latch.h"
      31             : #include "storage/lwlock.h"
      32             : #include "storage/proc.h"
      33             : #include "storage/shmem.h"
      34             : 
      35             : /* these headers are used by this particular worker's code */
      36             : #include "access/xact.h"
      37             : #include "commands/dbcommands.h"
      38             : #include "executor/spi.h"
      39             : #include "fmgr.h"
      40             : #include "lib/stringinfo.h"
      41             : #include "pgstat.h"
      42             : #include "tcop/utility.h"
      43             : #include "utils/acl.h"
      44             : #include "utils/builtins.h"
      45             : #include "utils/snapmgr.h"
      46             : 
      47          10 : PG_MODULE_MAGIC;
      48             : 
      49          18 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      50             : 
      51             : PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
      52             : 
      53             : /* GUC variables */
      54             : static int  worker_spi_naptime = 10;
      55             : static int  worker_spi_total_workers = 2;
      56             : static char *worker_spi_database = NULL;
      57             : static char *worker_spi_role = NULL;
      58             : 
      59             : /* value cached, fetched from shared memory */
      60             : static uint32 worker_spi_wait_event_main = 0;
      61             : 
      62             : typedef struct worktable
      63             : {
      64             :     const char *schema;
      65             :     const char *name;
      66             : } worktable;
      67             : 
      68             : /*
      69             :  * Initialize workspace for a worker process: create the schema if it doesn't
      70             :  * already exist.
      71             :  */
      72             : static void
      73           2 : initialize_worker_spi(worktable *table)
      74             : {
      75             :     int         ret;
      76             :     int         ntup;
      77             :     bool        isnull;
      78             :     StringInfoData buf;
      79             : 
      80           2 :     SetCurrentStatementStartTimestamp();
      81           2 :     StartTransactionCommand();
      82           2 :     SPI_connect();
      83           2 :     PushActiveSnapshot(GetTransactionSnapshot());
      84           2 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      85             : 
      86             :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      87           2 :     initStringInfo(&buf);
      88           2 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      89             :                      table->schema);
      90             : 
      91           2 :     debug_query_string = buf.data;
      92           2 :     ret = SPI_execute(buf.data, true, 0);
      93           2 :     if (ret != SPI_OK_SELECT)
      94           0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
      95             : 
      96           2 :     if (SPI_processed != 1)
      97           0 :         elog(FATAL, "not a singleton result");
      98             : 
      99           2 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
     100           2 :                                        SPI_tuptable->tupdesc,
     101             :                                        1, &isnull));
     102           2 :     if (isnull)
     103           0 :         elog(FATAL, "null result");
     104             : 
     105           2 :     if (ntup == 0)
     106             :     {
     107           2 :         debug_query_string = NULL;
     108           2 :         resetStringInfo(&buf);
     109           2 :         appendStringInfo(&buf,
     110             :                          "CREATE SCHEMA \"%s\" "
     111             :                          "CREATE TABLE \"%s\" ("
     112             :                          "     type text CHECK (type IN ('total', 'delta')), "
     113             :                          "     value   integer)"
     114             :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     115             :                          "WHERE type = 'total'",
     116             :                          table->schema, table->name, table->name, table->name);
     117             : 
     118             :         /* set statement start time */
     119           2 :         SetCurrentStatementStartTimestamp();
     120             : 
     121           2 :         debug_query_string = buf.data;
     122           2 :         ret = SPI_execute(buf.data, false, 0);
     123             : 
     124           2 :         if (ret != SPI_OK_UTILITY)
     125           0 :             elog(FATAL, "failed to create my schema");
     126             : 
     127           2 :         debug_query_string = NULL;  /* rest is not statement-specific */
     128             :     }
     129             : 
     130           2 :     SPI_finish();
     131           2 :     PopActiveSnapshot();
     132           2 :     CommitTransactionCommand();
     133           2 :     debug_query_string = NULL;
     134           2 :     pgstat_report_activity(STATE_IDLE, NULL);
     135           2 : }
     136             : 
     137             : void
     138           6 : worker_spi_main(Datum main_arg)
     139             : {
     140           6 :     int         index = DatumGetInt32(main_arg);
     141             :     worktable  *table;
     142             :     StringInfoData buf;
     143             :     char        name[20];
     144             :     Oid         dboid;
     145             :     Oid         roleoid;
     146             :     char       *p;
     147           6 :     bits32      flags = 0;
     148             : 
     149           6 :     table = palloc(sizeof(worktable));
     150           6 :     sprintf(name, "schema%d", index);
     151           6 :     table->schema = pstrdup(name);
     152           6 :     table->name = pstrdup("counted");
     153             : 
     154             :     /* fetch database and role OIDs, these are set for a dynamic worker */
     155           6 :     p = MyBgworkerEntry->bgw_extra;
     156           6 :     memcpy(&dboid, p, sizeof(Oid));
     157           6 :     p += sizeof(Oid);
     158           6 :     memcpy(&roleoid, p, sizeof(Oid));
     159           6 :     p += sizeof(Oid);
     160           6 :     memcpy(&flags, p, sizeof(bits32));
     161             : 
     162             :     /* Establish signal handlers before unblocking signals. */
     163           6 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     164           6 :     pqsignal(SIGTERM, die);
     165             : 
     166             :     /* We're now ready to receive signals */
     167           6 :     BackgroundWorkerUnblockSignals();
     168             : 
     169             :     /* Connect to our database */
     170           6 :     if (OidIsValid(dboid))
     171           6 :         BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
     172             :     else
     173           0 :         BackgroundWorkerInitializeConnection(worker_spi_database,
     174             :                                              worker_spi_role, flags);
     175             : 
     176             :     /*
     177             :      * Disable parallel query for workers started with
     178             :      * BGWORKER_BYPASS_ALLOWCONN or BGWORKER_BYPASS_ROLELOGINCHECK so as these
     179             :      * don't attempt connections using a database or a role that may not allow
     180             :      * that.
     181             :      */
     182           2 :     if ((flags & (BGWORKER_BYPASS_ALLOWCONN | BGWORKER_BYPASS_ROLELOGINCHECK)))
     183           0 :         SetConfigOption("max_parallel_workers_per_gather", "0",
     184             :                         PGC_USERSET, PGC_S_OVERRIDE);
     185             : 
     186           2 :     elog(LOG, "%s initialized with %s.%s",
     187             :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     188           2 :     initialize_worker_spi(table);
     189             : 
     190             :     /*
     191             :      * Quote identifiers passed to us.  Note that this must be done after
     192             :      * initialize_worker_spi, because that routine assumes the names are not
     193             :      * quoted.
     194             :      *
     195             :      * Note some memory might be leaked here.
     196             :      */
     197           2 :     table->schema = quote_identifier(table->schema);
     198           2 :     table->name = quote_identifier(table->name);
     199             : 
     200           2 :     initStringInfo(&buf);
     201           2 :     appendStringInfo(&buf,
     202             :                      "WITH deleted AS (DELETE "
     203             :                      "FROM %s.%s "
     204             :                      "WHERE type = 'delta' RETURNING value), "
     205             :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     206             :                      "FROM deleted) "
     207             :                      "UPDATE %s.%s "
     208             :                      "SET value = %s.value + total.sum "
     209             :                      "FROM total WHERE type = 'total' "
     210             :                      "RETURNING %s.value",
     211             :                      table->schema, table->name,
     212             :                      table->schema, table->name,
     213             :                      table->name,
     214             :                      table->name);
     215             : 
     216             :     /*
     217             :      * Main loop: do this until SIGTERM is received and processed by
     218             :      * ProcessInterrupts.
     219             :      */
     220             :     for (;;)
     221           4 :     {
     222             :         int         ret;
     223             : 
     224             :         /* First time, allocate or get the custom wait event */
     225           6 :         if (worker_spi_wait_event_main == 0)
     226           2 :             worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
     227             : 
     228             :         /*
     229             :          * Background workers mustn't call usleep() or any direct equivalent:
     230             :          * instead, they may wait on their process latch, which sleeps as
     231             :          * necessary, but is awakened if postmaster dies.  That way the
     232             :          * background process goes away immediately in an emergency.
     233             :          */
     234           6 :         (void) WaitLatch(MyLatch,
     235             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     236             :                          worker_spi_naptime * 1000L,
     237             :                          worker_spi_wait_event_main);
     238           6 :         ResetLatch(MyLatch);
     239             : 
     240           6 :         CHECK_FOR_INTERRUPTS();
     241             : 
     242             :         /*
     243             :          * In case of a SIGHUP, just reload the configuration.
     244             :          */
     245           4 :         if (ConfigReloadPending)
     246             :         {
     247           2 :             ConfigReloadPending = false;
     248           2 :             ProcessConfigFile(PGC_SIGHUP);
     249             :         }
     250             : 
     251             :         /*
     252             :          * Start a transaction on which we can run queries.  Note that each
     253             :          * StartTransactionCommand() call should be preceded by a
     254             :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     255             :          * for the statement we're about the run, and also the transaction
     256             :          * start time.  Also, each other query sent to SPI should probably be
     257             :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     258             :          * start time is always up to date.
     259             :          *
     260             :          * The SPI_connect() call lets us run queries through the SPI manager,
     261             :          * and the PushActiveSnapshot() call creates an "active" snapshot
     262             :          * which is necessary for queries to have MVCC data to work on.
     263             :          *
     264             :          * The pgstat_report_activity() call makes our activity visible
     265             :          * through the pgstat views.
     266             :          */
     267           4 :         SetCurrentStatementStartTimestamp();
     268           4 :         StartTransactionCommand();
     269           4 :         SPI_connect();
     270           4 :         PushActiveSnapshot(GetTransactionSnapshot());
     271           4 :         debug_query_string = buf.data;
     272           4 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     273             : 
     274             :         /* We can now execute queries via SPI */
     275           4 :         ret = SPI_execute(buf.data, false, 0);
     276             : 
     277           4 :         if (ret != SPI_OK_UPDATE_RETURNING)
     278           0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     279             :                  table->schema, table->name, ret);
     280             : 
     281           4 :         if (SPI_processed > 0)
     282             :         {
     283             :             bool        isnull;
     284             :             int32       val;
     285             : 
     286           2 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     287           2 :                                               SPI_tuptable->tupdesc,
     288             :                                               1, &isnull));
     289           2 :             if (!isnull)
     290           2 :                 elog(LOG, "%s: count in %s.%s is now %d",
     291             :                      MyBgworkerEntry->bgw_name,
     292             :                      table->schema, table->name, val);
     293             :         }
     294             : 
     295             :         /*
     296             :          * And finish our transaction.
     297             :          */
     298           4 :         SPI_finish();
     299           4 :         PopActiveSnapshot();
     300           4 :         CommitTransactionCommand();
     301           4 :         debug_query_string = NULL;
     302           4 :         pgstat_report_stat(true);
     303           4 :         pgstat_report_activity(STATE_IDLE, NULL);
     304             :     }
     305             : 
     306             :     /* Not reachable */
     307             : }
     308             : 
     309             : /*
     310             :  * Entrypoint of this module.
     311             :  *
     312             :  * We register more than one worker process here, to demonstrate how that can
     313             :  * be done.
     314             :  */
     315             : void
     316          10 : _PG_init(void)
     317             : {
     318             :     BackgroundWorker worker;
     319             : 
     320             :     /* get the configuration */
     321             : 
     322             :     /*
     323             :      * These GUCs are defined even if this library is not loaded with
     324             :      * shared_preload_libraries, for worker_spi_launch().
     325             :      */
     326          10 :     DefineCustomIntVariable("worker_spi.naptime",
     327             :                             "Duration between each check (in seconds).",
     328             :                             NULL,
     329             :                             &worker_spi_naptime,
     330             :                             10,
     331             :                             1,
     332             :                             INT_MAX,
     333             :                             PGC_SIGHUP,
     334             :                             0,
     335             :                             NULL,
     336             :                             NULL,
     337             :                             NULL);
     338             : 
     339          10 :     DefineCustomStringVariable("worker_spi.database",
     340             :                                "Database to connect to.",
     341             :                                NULL,
     342             :                                &worker_spi_database,
     343             :                                "postgres",
     344             :                                PGC_SIGHUP,
     345             :                                0,
     346             :                                NULL, NULL, NULL);
     347             : 
     348          10 :     DefineCustomStringVariable("worker_spi.role",
     349             :                                "Role to connect with.",
     350             :                                NULL,
     351             :                                &worker_spi_role,
     352             :                                NULL,
     353             :                                PGC_SIGHUP,
     354             :                                0,
     355             :                                NULL, NULL, NULL);
     356             : 
     357          10 :     if (!process_shared_preload_libraries_in_progress)
     358           8 :         return;
     359             : 
     360           2 :     DefineCustomIntVariable("worker_spi.total_workers",
     361             :                             "Number of workers.",
     362             :                             NULL,
     363             :                             &worker_spi_total_workers,
     364             :                             2,
     365             :                             1,
     366             :                             100,
     367             :                             PGC_POSTMASTER,
     368             :                             0,
     369             :                             NULL,
     370             :                             NULL,
     371             :                             NULL);
     372             : 
     373           2 :     MarkGUCPrefixReserved("worker_spi");
     374             : 
     375             :     /* set up common data for all our workers */
     376           2 :     memset(&worker, 0, sizeof(worker));
     377           2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     378             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     379           2 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     380           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     381           2 :     sprintf(worker.bgw_library_name, "worker_spi");
     382           2 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     383           2 :     worker.bgw_notify_pid = 0;
     384             : 
     385             :     /*
     386             :      * Now fill in worker-specific data, and do the actual registrations.
     387             :      *
     388             :      * bgw_extra can optionally include a database OID, a role OID and a set
     389             :      * of flags.  This is left empty here to fallback to the related GUCs at
     390             :      * startup (0 for the bgworker flags).
     391             :      */
     392           8 :     for (int i = 1; i <= worker_spi_total_workers; i++)
     393             :     {
     394           6 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     395           6 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     396           6 :         worker.bgw_main_arg = Int32GetDatum(i);
     397             : 
     398           6 :         RegisterBackgroundWorker(&worker);
     399             :     }
     400             : }
     401             : 
     402             : /*
     403             :  * Dynamically launch an SPI worker.
     404             :  */
     405             : Datum
     406          14 : worker_spi_launch(PG_FUNCTION_ARGS)
     407             : {
     408          14 :     int32       i = PG_GETARG_INT32(0);
     409          14 :     Oid         dboid = PG_GETARG_OID(1);
     410          14 :     Oid         roleoid = PG_GETARG_OID(2);
     411             :     BackgroundWorker worker;
     412             :     BackgroundWorkerHandle *handle;
     413             :     BgwHandleStatus status;
     414             :     pid_t       pid;
     415             :     char       *p;
     416          14 :     bits32      flags = 0;
     417          14 :     ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
     418             :     Size        ndim;
     419             :     int         nelems;
     420             :     Datum      *datum_flags;
     421             : 
     422          14 :     memset(&worker, 0, sizeof(worker));
     423          14 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     424             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     425          14 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     426          14 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     427          14 :     sprintf(worker.bgw_library_name, "worker_spi");
     428          14 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     429          14 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
     430          14 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
     431          14 :     worker.bgw_main_arg = Int32GetDatum(i);
     432             :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     433          14 :     worker.bgw_notify_pid = MyProcPid;
     434             : 
     435             :     /* extract flags, if any */
     436          14 :     ndim = ARR_NDIM(arr);
     437          14 :     if (ndim > 1)
     438           0 :         ereport(ERROR,
     439             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     440             :                  errmsg("flags array must be one-dimensional")));
     441             : 
     442          14 :     if (array_contains_nulls(arr))
     443           0 :         ereport(ERROR,
     444             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     445             :                  errmsg("flags array must not contain nulls")));
     446             : 
     447             :     Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     448          14 :     deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
     449             : 
     450          18 :     for (i = 0; i < nelems; i++)
     451             :     {
     452           4 :         char       *optname = TextDatumGetCString(datum_flags[i]);
     453             : 
     454           4 :         if (strcmp(optname, "ALLOWCONN") == 0)
     455           2 :             flags |= BGWORKER_BYPASS_ALLOWCONN;
     456           2 :         else if (strcmp(optname, "ROLELOGINCHECK") == 0)
     457           2 :             flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
     458             :         else
     459           0 :             ereport(ERROR,
     460             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     461             :                      errmsg("incorrect flag value found in array")));
     462             :     }
     463             : 
     464             :     /*
     465             :      * Register database and role to use for the worker started in bgw_extra.
     466             :      * If none have been provided, this will fall back to the GUCs at startup.
     467             :      */
     468          14 :     if (!OidIsValid(dboid))
     469           2 :         dboid = get_database_oid(worker_spi_database, false);
     470             : 
     471             :     /*
     472             :      * worker_spi_role is NULL by default, so this gives to worker_spi_main()
     473             :      * an invalid OID in this case.
     474             :      */
     475          14 :     if (!OidIsValid(roleoid) && worker_spi_role)
     476           0 :         roleoid = get_role_oid(worker_spi_role, false);
     477             : 
     478          14 :     p = worker.bgw_extra;
     479          14 :     memcpy(p, &dboid, sizeof(Oid));
     480          14 :     p += sizeof(Oid);
     481          14 :     memcpy(p, &roleoid, sizeof(Oid));
     482          14 :     p += sizeof(Oid);
     483          14 :     memcpy(p, &flags, sizeof(bits32));
     484             : 
     485          14 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     486           0 :         PG_RETURN_NULL();
     487             : 
     488          14 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     489             : 
     490          14 :     if (status == BGWH_STOPPED)
     491           0 :         ereport(ERROR,
     492             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     493             :                  errmsg("could not start background process"),
     494             :                  errhint("More details may be available in the server log.")));
     495          14 :     if (status == BGWH_POSTMASTER_DIED)
     496           0 :         ereport(ERROR,
     497             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     498             :                  errmsg("cannot start background processes without postmaster"),
     499             :                  errhint("Kill all remaining database processes and restart the database.")));
     500             :     Assert(status == BGWH_STARTED);
     501             : 
     502          14 :     PG_RETURN_INT32(pid);
     503             : }

Generated by: LCOV version 1.14