LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 145 159 91.2 %
Date: 2024-04-23 11:10:57 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * worker_spi.c
       4             :  *      Sample background worker code that demonstrates various coding
       5             :  *      patterns: establishing a database connection; starting and committing
       6             :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7             :  *      the configuration file; reporting to pg_stat_activity; using the
       8             :  *      process latch to sleep and exit in case of postmaster death.
       9             :  *
      10             :  * This code connects to a database, creates a schema and table, and summarizes
      11             :  * the numbers contained therein.  To see it working, insert an initial value
      12             :  * with "total" type and some initial value; then insert some other rows with
      13             :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14             :  * aggregated into the total.
      15             :  *
      16             :  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *      src/test/modules/worker_spi/worker_spi.c
      20             :  *
      21             :  * -------------------------------------------------------------------------
      22             :  */
      23             : #include "postgres.h"
      24             : 
      25             : /* These are always necessary for a bgworker */
      26             : #include "miscadmin.h"
      27             : #include "postmaster/bgworker.h"
      28             : #include "postmaster/interrupt.h"
      29             : #include "storage/ipc.h"
      30             : #include "storage/latch.h"
      31             : #include "storage/lwlock.h"
      32             : #include "storage/proc.h"
      33             : #include "storage/shmem.h"
      34             : 
      35             : /* these headers are used by this particular worker's code */
      36             : #include "access/xact.h"
      37             : #include "commands/dbcommands.h"
      38             : #include "executor/spi.h"
      39             : #include "fmgr.h"
      40             : #include "lib/stringinfo.h"
      41             : #include "pgstat.h"
      42             : #include "tcop/utility.h"
      43             : #include "utils/acl.h"
      44             : #include "utils/builtins.h"
      45             : #include "utils/snapmgr.h"
      46             : 
      47          10 : PG_MODULE_MAGIC;
      48             : 
      49          18 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      50             : 
      51             : PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
      52             : 
      53             : /* GUC variables */
      54             : static int  worker_spi_naptime = 10;
      55             : static int  worker_spi_total_workers = 2;
      56             : static char *worker_spi_database = NULL;
      57             : static char *worker_spi_role = NULL;
      58             : 
      59             : /* value cached, fetched from shared memory */
      60             : static uint32 worker_spi_wait_event_main = 0;
      61             : 
      62             : typedef struct worktable
      63             : {
      64             :     const char *schema;
      65             :     const char *name;
      66             : } worktable;
      67             : 
      68             : /*
      69             :  * Initialize workspace for a worker process: create the schema if it doesn't
      70             :  * already exist.
      71             :  */
      72             : static void
      73           2 : initialize_worker_spi(worktable *table)
      74             : {
      75             :     int         ret;
      76             :     int         ntup;
      77             :     bool        isnull;
      78             :     StringInfoData buf;
      79             : 
      80           2 :     SetCurrentStatementStartTimestamp();
      81           2 :     StartTransactionCommand();
      82           2 :     SPI_connect();
      83           2 :     PushActiveSnapshot(GetTransactionSnapshot());
      84           2 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      85             : 
      86             :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      87           2 :     initStringInfo(&buf);
      88           2 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      89             :                      table->schema);
      90             : 
      91           2 :     debug_query_string = buf.data;
      92           2 :     ret = SPI_execute(buf.data, true, 0);
      93           2 :     if (ret != SPI_OK_SELECT)
      94           0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
      95             : 
      96           2 :     if (SPI_processed != 1)
      97           0 :         elog(FATAL, "not a singleton result");
      98             : 
      99           2 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
     100           2 :                                        SPI_tuptable->tupdesc,
     101             :                                        1, &isnull));
     102           2 :     if (isnull)
     103           0 :         elog(FATAL, "null result");
     104             : 
     105           2 :     if (ntup == 0)
     106             :     {
     107           2 :         debug_query_string = NULL;
     108           2 :         resetStringInfo(&buf);
     109           2 :         appendStringInfo(&buf,
     110             :                          "CREATE SCHEMA \"%s\" "
     111             :                          "CREATE TABLE \"%s\" ("
     112             :                          "     type text CHECK (type IN ('total', 'delta')), "
     113             :                          "     value   integer)"
     114             :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     115             :                          "WHERE type = 'total'",
     116             :                          table->schema, table->name, table->name, table->name);
     117             : 
     118             :         /* set statement start time */
     119           2 :         SetCurrentStatementStartTimestamp();
     120             : 
     121           2 :         debug_query_string = buf.data;
     122           2 :         ret = SPI_execute(buf.data, false, 0);
     123             : 
     124           2 :         if (ret != SPI_OK_UTILITY)
     125           0 :             elog(FATAL, "failed to create my schema");
     126             : 
     127           2 :         debug_query_string = NULL;  /* rest is not statement-specific */
     128             :     }
     129             : 
     130           2 :     SPI_finish();
     131           2 :     PopActiveSnapshot();
     132           2 :     CommitTransactionCommand();
     133           2 :     debug_query_string = NULL;
     134           2 :     pgstat_report_activity(STATE_IDLE, NULL);
     135           2 : }
     136             : 
     137             : void
     138           6 : worker_spi_main(Datum main_arg)
     139             : {
     140           6 :     int         index = DatumGetInt32(main_arg);
     141             :     worktable  *table;
     142             :     StringInfoData buf;
     143             :     char        name[20];
     144             :     Oid         dboid;
     145             :     Oid         roleoid;
     146             :     char       *p;
     147           6 :     bits32      flags = 0;
     148             : 
     149           6 :     table = palloc(sizeof(worktable));
     150           6 :     sprintf(name, "schema%d", index);
     151           6 :     table->schema = pstrdup(name);
     152           6 :     table->name = pstrdup("counted");
     153             : 
     154             :     /* fetch database and role OIDs, these are set for a dynamic worker */
     155           6 :     p = MyBgworkerEntry->bgw_extra;
     156           6 :     memcpy(&dboid, p, sizeof(Oid));
     157           6 :     p += sizeof(Oid);
     158           6 :     memcpy(&roleoid, p, sizeof(Oid));
     159           6 :     p += sizeof(Oid);
     160           6 :     memcpy(&flags, p, sizeof(bits32));
     161             : 
     162             :     /* Establish signal handlers before unblocking signals. */
     163           6 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     164           6 :     pqsignal(SIGTERM, die);
     165             : 
     166             :     /* We're now ready to receive signals */
     167           6 :     BackgroundWorkerUnblockSignals();
     168             : 
     169             :     /* Connect to our database */
     170           6 :     if (OidIsValid(dboid))
     171           6 :         BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
     172             :     else
     173           0 :         BackgroundWorkerInitializeConnection(worker_spi_database,
     174             :                                              worker_spi_role, flags);
     175             : 
     176             :     /*
     177             :      * Disable parallel query for workers started with BYPASS_ALLOWCONN or
     178             :      * BGWORKER_BYPASS_ALLOWCONN so as these don't attempt connections using a
     179             :      * database or a role that may not allow that.
     180             :      */
     181           2 :     if ((flags & (BGWORKER_BYPASS_ALLOWCONN | BGWORKER_BYPASS_ROLELOGINCHECK)))
     182           0 :         SetConfigOption("max_parallel_workers_per_gather", "0",
     183             :                         PGC_USERSET, PGC_S_OVERRIDE);
     184             : 
     185           2 :     elog(LOG, "%s initialized with %s.%s",
     186             :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     187           2 :     initialize_worker_spi(table);
     188             : 
     189             :     /*
     190             :      * Quote identifiers passed to us.  Note that this must be done after
     191             :      * initialize_worker_spi, because that routine assumes the names are not
     192             :      * quoted.
     193             :      *
     194             :      * Note some memory might be leaked here.
     195             :      */
     196           2 :     table->schema = quote_identifier(table->schema);
     197           2 :     table->name = quote_identifier(table->name);
     198             : 
     199           2 :     initStringInfo(&buf);
     200           2 :     appendStringInfo(&buf,
     201             :                      "WITH deleted AS (DELETE "
     202             :                      "FROM %s.%s "
     203             :                      "WHERE type = 'delta' RETURNING value), "
     204             :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     205             :                      "FROM deleted) "
     206             :                      "UPDATE %s.%s "
     207             :                      "SET value = %s.value + total.sum "
     208             :                      "FROM total WHERE type = 'total' "
     209             :                      "RETURNING %s.value",
     210             :                      table->schema, table->name,
     211             :                      table->schema, table->name,
     212             :                      table->name,
     213             :                      table->name);
     214             : 
     215             :     /*
     216             :      * Main loop: do this until SIGTERM is received and processed by
     217             :      * ProcessInterrupts.
     218             :      */
     219             :     for (;;)
     220           4 :     {
     221             :         int         ret;
     222             : 
     223             :         /* First time, allocate or get the custom wait event */
     224           6 :         if (worker_spi_wait_event_main == 0)
     225           2 :             worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
     226             : 
     227             :         /*
     228             :          * Background workers mustn't call usleep() or any direct equivalent:
     229             :          * instead, they may wait on their process latch, which sleeps as
     230             :          * necessary, but is awakened if postmaster dies.  That way the
     231             :          * background process goes away immediately in an emergency.
     232             :          */
     233           6 :         (void) WaitLatch(MyLatch,
     234             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     235             :                          worker_spi_naptime * 1000L,
     236             :                          worker_spi_wait_event_main);
     237           6 :         ResetLatch(MyLatch);
     238             : 
     239           6 :         CHECK_FOR_INTERRUPTS();
     240             : 
     241             :         /*
     242             :          * In case of a SIGHUP, just reload the configuration.
     243             :          */
     244           4 :         if (ConfigReloadPending)
     245             :         {
     246           2 :             ConfigReloadPending = false;
     247           2 :             ProcessConfigFile(PGC_SIGHUP);
     248             :         }
     249             : 
     250             :         /*
     251             :          * Start a transaction on which we can run queries.  Note that each
     252             :          * StartTransactionCommand() call should be preceded by a
     253             :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     254             :          * for the statement we're about the run, and also the transaction
     255             :          * start time.  Also, each other query sent to SPI should probably be
     256             :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     257             :          * start time is always up to date.
     258             :          *
     259             :          * The SPI_connect() call lets us run queries through the SPI manager,
     260             :          * and the PushActiveSnapshot() call creates an "active" snapshot
     261             :          * which is necessary for queries to have MVCC data to work on.
     262             :          *
     263             :          * The pgstat_report_activity() call makes our activity visible
     264             :          * through the pgstat views.
     265             :          */
     266           4 :         SetCurrentStatementStartTimestamp();
     267           4 :         StartTransactionCommand();
     268           4 :         SPI_connect();
     269           4 :         PushActiveSnapshot(GetTransactionSnapshot());
     270           4 :         debug_query_string = buf.data;
     271           4 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     272             : 
     273             :         /* We can now execute queries via SPI */
     274           4 :         ret = SPI_execute(buf.data, false, 0);
     275             : 
     276           4 :         if (ret != SPI_OK_UPDATE_RETURNING)
     277           0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     278             :                  table->schema, table->name, ret);
     279             : 
     280           4 :         if (SPI_processed > 0)
     281             :         {
     282             :             bool        isnull;
     283             :             int32       val;
     284             : 
     285           2 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     286           2 :                                               SPI_tuptable->tupdesc,
     287             :                                               1, &isnull));
     288           2 :             if (!isnull)
     289           2 :                 elog(LOG, "%s: count in %s.%s is now %d",
     290             :                      MyBgworkerEntry->bgw_name,
     291             :                      table->schema, table->name, val);
     292             :         }
     293             : 
     294             :         /*
     295             :          * And finish our transaction.
     296             :          */
     297           4 :         SPI_finish();
     298           4 :         PopActiveSnapshot();
     299           4 :         CommitTransactionCommand();
     300           4 :         debug_query_string = NULL;
     301           4 :         pgstat_report_stat(true);
     302           4 :         pgstat_report_activity(STATE_IDLE, NULL);
     303             :     }
     304             : 
     305             :     /* Not reachable */
     306             : }
     307             : 
     308             : /*
     309             :  * Entrypoint of this module.
     310             :  *
     311             :  * We register more than one worker process here, to demonstrate how that can
     312             :  * be done.
     313             :  */
     314             : void
     315          10 : _PG_init(void)
     316             : {
     317             :     BackgroundWorker worker;
     318             : 
     319             :     /* get the configuration */
     320             : 
     321             :     /*
     322             :      * These GUCs are defined even if this library is not loaded with
     323             :      * shared_preload_libraries, for worker_spi_launch().
     324             :      */
     325          10 :     DefineCustomIntVariable("worker_spi.naptime",
     326             :                             "Duration between each check (in seconds).",
     327             :                             NULL,
     328             :                             &worker_spi_naptime,
     329             :                             10,
     330             :                             1,
     331             :                             INT_MAX,
     332             :                             PGC_SIGHUP,
     333             :                             0,
     334             :                             NULL,
     335             :                             NULL,
     336             :                             NULL);
     337             : 
     338          10 :     DefineCustomStringVariable("worker_spi.database",
     339             :                                "Database to connect to.",
     340             :                                NULL,
     341             :                                &worker_spi_database,
     342             :                                "postgres",
     343             :                                PGC_SIGHUP,
     344             :                                0,
     345             :                                NULL, NULL, NULL);
     346             : 
     347          10 :     DefineCustomStringVariable("worker_spi.role",
     348             :                                "Role to connect with.",
     349             :                                NULL,
     350             :                                &worker_spi_role,
     351             :                                NULL,
     352             :                                PGC_SIGHUP,
     353             :                                0,
     354             :                                NULL, NULL, NULL);
     355             : 
     356          10 :     if (!process_shared_preload_libraries_in_progress)
     357           8 :         return;
     358             : 
     359           2 :     DefineCustomIntVariable("worker_spi.total_workers",
     360             :                             "Number of workers.",
     361             :                             NULL,
     362             :                             &worker_spi_total_workers,
     363             :                             2,
     364             :                             1,
     365             :                             100,
     366             :                             PGC_POSTMASTER,
     367             :                             0,
     368             :                             NULL,
     369             :                             NULL,
     370             :                             NULL);
     371             : 
     372           2 :     MarkGUCPrefixReserved("worker_spi");
     373             : 
     374             :     /* set up common data for all our workers */
     375           2 :     memset(&worker, 0, sizeof(worker));
     376           2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     377             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     378           2 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     379           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     380           2 :     sprintf(worker.bgw_library_name, "worker_spi");
     381           2 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     382           2 :     worker.bgw_notify_pid = 0;
     383             : 
     384             :     /*
     385             :      * Now fill in worker-specific data, and do the actual registrations.
     386             :      *
     387             :      * bgw_extra can optionally include a dabatase OID, a role OID and a set
     388             :      * of flags.  This is left empty here to fallback to the related GUCs at
     389             :      * startup (0 for the bgworker flags).
     390             :      */
     391           8 :     for (int i = 1; i <= worker_spi_total_workers; i++)
     392             :     {
     393           6 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     394           6 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     395           6 :         worker.bgw_main_arg = Int32GetDatum(i);
     396             : 
     397           6 :         RegisterBackgroundWorker(&worker);
     398             :     }
     399             : }
     400             : 
     401             : /*
     402             :  * Dynamically launch an SPI worker.
     403             :  */
     404             : Datum
     405          14 : worker_spi_launch(PG_FUNCTION_ARGS)
     406             : {
     407          14 :     int32       i = PG_GETARG_INT32(0);
     408          14 :     Oid         dboid = PG_GETARG_OID(1);
     409          14 :     Oid         roleoid = PG_GETARG_OID(2);
     410             :     BackgroundWorker worker;
     411             :     BackgroundWorkerHandle *handle;
     412             :     BgwHandleStatus status;
     413             :     pid_t       pid;
     414             :     char       *p;
     415          14 :     bits32      flags = 0;
     416          14 :     ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
     417             :     Size        ndim;
     418             :     int         nelems;
     419             :     Datum      *datum_flags;
     420             : 
     421          14 :     memset(&worker, 0, sizeof(worker));
     422          14 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     423             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     424          14 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     425          14 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     426          14 :     sprintf(worker.bgw_library_name, "worker_spi");
     427          14 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     428          14 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
     429          14 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
     430          14 :     worker.bgw_main_arg = Int32GetDatum(i);
     431             :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     432          14 :     worker.bgw_notify_pid = MyProcPid;
     433             : 
     434             :     /* extract flags, if any */
     435          14 :     ndim = ARR_NDIM(arr);
     436          14 :     if (ndim > 1)
     437           0 :         ereport(ERROR,
     438             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     439             :                  errmsg("flags array must be one-dimensional")));
     440             : 
     441          14 :     if (array_contains_nulls(arr))
     442           0 :         ereport(ERROR,
     443             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     444             :                  errmsg("flags array must not contain nulls")));
     445             : 
     446             :     Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     447          14 :     deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
     448             : 
     449          18 :     for (i = 0; i < nelems; i++)
     450             :     {
     451           4 :         char       *optname = TextDatumGetCString(datum_flags[i]);
     452             : 
     453           4 :         if (strcmp(optname, "ALLOWCONN") == 0)
     454           2 :             flags |= BGWORKER_BYPASS_ALLOWCONN;
     455           2 :         else if (strcmp(optname, "ROLELOGINCHECK") == 0)
     456           2 :             flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
     457             :         else
     458           0 :             ereport(ERROR,
     459             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     460             :                      errmsg("incorrect flag value found in array")));
     461             :     }
     462             : 
     463             :     /*
     464             :      * Register database and role to use for the worker started in bgw_extra.
     465             :      * If none have been provided, this will fall back to the GUCs at startup.
     466             :      */
     467          14 :     if (!OidIsValid(dboid))
     468           2 :         dboid = get_database_oid(worker_spi_database, false);
     469             : 
     470             :     /*
     471             :      * worker_spi_role is NULL by default, so this gives to worker_spi_main()
     472             :      * an invalid OID in this case.
     473             :      */
     474          14 :     if (!OidIsValid(roleoid) && worker_spi_role)
     475           0 :         roleoid = get_role_oid(worker_spi_role, false);
     476             : 
     477          14 :     p = worker.bgw_extra;
     478          14 :     memcpy(p, &dboid, sizeof(Oid));
     479          14 :     p += sizeof(Oid);
     480          14 :     memcpy(p, &roleoid, sizeof(Oid));
     481          14 :     p += sizeof(Oid);
     482          14 :     memcpy(p, &flags, sizeof(bits32));
     483             : 
     484          14 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     485           0 :         PG_RETURN_NULL();
     486             : 
     487          14 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     488             : 
     489          14 :     if (status == BGWH_STOPPED)
     490           0 :         ereport(ERROR,
     491             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     492             :                  errmsg("could not start background process"),
     493             :                  errhint("More details may be available in the server log.")));
     494          14 :     if (status == BGWH_POSTMASTER_DIED)
     495           0 :         ereport(ERROR,
     496             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     497             :                  errmsg("cannot start background processes without postmaster"),
     498             :                  errhint("Kill all remaining database processes and restart the database.")));
     499             :     Assert(status == BGWH_STARTED);
     500             : 
     501          14 :     PG_RETURN_INT32(pid);
     502             : }

Generated by: LCOV version 1.14