LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 112 121 92.6 %
Date: 2019-11-15 23:07:02 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * worker_spi.c
       4             :  *      Sample background worker code that demonstrates various coding
       5             :  *      patterns: establishing a database connection; starting and committing
       6             :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7             :  *      the configuration file; reporting to pg_stat_activity; using the
       8             :  *      process latch to sleep and exit in case of postmaster death.
       9             :  *
      10             :  * This code connects to a database, creates a schema and table, and summarizes
      11             :  * the numbers contained therein.  To see it working, insert an initial value
      12             :  * with "total" type and some initial value; then insert some other rows with
      13             :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14             :  * aggregated into the total.
      15             :  *
      16             :  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
      17             :  *
      18             :  * IDENTIFICATION
      19             :  *      src/test/modules/worker_spi/worker_spi.c
      20             :  *
      21             :  * -------------------------------------------------------------------------
      22             :  */
      23             : #include "postgres.h"
      24             : 
      25             : /* These are always necessary for a bgworker */
      26             : #include "miscadmin.h"
      27             : #include "postmaster/bgworker.h"
      28             : #include "storage/ipc.h"
      29             : #include "storage/latch.h"
      30             : #include "storage/lwlock.h"
      31             : #include "storage/proc.h"
      32             : #include "storage/shmem.h"
      33             : 
      34             : /* these headers are used by this particular worker's code */
      35             : #include "access/xact.h"
      36             : #include "executor/spi.h"
      37             : #include "fmgr.h"
      38             : #include "lib/stringinfo.h"
      39             : #include "pgstat.h"
      40             : #include "utils/builtins.h"
      41             : #include "utils/snapmgr.h"
      42             : #include "tcop/utility.h"
      43             : 
      44           2 : PG_MODULE_MAGIC;
      45             : 
      46           4 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      47             : 
      48             : void        _PG_init(void);
      49             : void        worker_spi_main(Datum) pg_attribute_noreturn();
      50             : 
      51             : /* flags set by signal handlers */
      52             : static volatile sig_atomic_t got_sighup = false;
      53             : static volatile sig_atomic_t got_sigterm = false;
      54             : 
      55             : /* GUC variables */
      56             : static int  worker_spi_naptime = 10;
      57             : static int  worker_spi_total_workers = 2;
      58             : static char *worker_spi_database = NULL;
      59             : 
      60             : 
      61             : typedef struct worktable
      62             : {
      63             :     const char *schema;
      64             :     const char *name;
      65             : } worktable;
      66             : 
      67             : /*
      68             :  * Signal handler for SIGTERM
      69             :  *      Set a flag to let the main loop to terminate, and set our latch to wake
      70             :  *      it up.
      71             :  */
      72             : static void
      73           2 : worker_spi_sigterm(SIGNAL_ARGS)
      74             : {
      75           2 :     int         save_errno = errno;
      76             : 
      77           2 :     got_sigterm = true;
      78           2 :     SetLatch(MyLatch);
      79             : 
      80           2 :     errno = save_errno;
      81           2 : }
      82             : 
      83             : /*
      84             :  * Signal handler for SIGHUP
      85             :  *      Set a flag to tell the main loop to reread the config file, and set
      86             :  *      our latch to wake it up.
      87             :  */
      88             : static void
      89           2 : worker_spi_sighup(SIGNAL_ARGS)
      90             : {
      91           2 :     int         save_errno = errno;
      92             : 
      93           2 :     got_sighup = true;
      94           2 :     SetLatch(MyLatch);
      95             : 
      96           2 :     errno = save_errno;
      97           2 : }
      98             : 
      99             : /*
     100             :  * Initialize workspace for a worker process: create the schema if it doesn't
     101             :  * already exist.
     102             :  */
     103             : static void
     104           2 : initialize_worker_spi(worktable *table)
     105             : {
     106             :     int         ret;
     107             :     int         ntup;
     108             :     bool        isnull;
     109             :     StringInfoData buf;
     110             : 
     111           2 :     SetCurrentStatementStartTimestamp();
     112           2 :     StartTransactionCommand();
     113           2 :     SPI_connect();
     114           2 :     PushActiveSnapshot(GetTransactionSnapshot());
     115           2 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
     116             : 
     117             :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
     118           2 :     initStringInfo(&buf);
     119           2 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
     120             :                      table->schema);
     121             : 
     122           2 :     ret = SPI_execute(buf.data, true, 0);
     123           2 :     if (ret != SPI_OK_SELECT)
     124           0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
     125             : 
     126           2 :     if (SPI_processed != 1)
     127           0 :         elog(FATAL, "not a singleton result");
     128             : 
     129           2 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
     130             :                                        SPI_tuptable->tupdesc,
     131             :                                        1, &isnull));
     132           2 :     if (isnull)
     133           0 :         elog(FATAL, "null result");
     134             : 
     135           2 :     if (ntup == 0)
     136             :     {
     137           2 :         resetStringInfo(&buf);
     138           2 :         appendStringInfo(&buf,
     139             :                          "CREATE SCHEMA \"%s\" "
     140             :                          "CREATE TABLE \"%s\" ("
     141             :                          "     type text CHECK (type IN ('total', 'delta')), "
     142             :                          "     value   integer)"
     143             :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     144             :                          "WHERE type = 'total'",
     145             :                          table->schema, table->name, table->name, table->name);
     146             : 
     147             :         /* set statement start time */
     148           2 :         SetCurrentStatementStartTimestamp();
     149             : 
     150           2 :         ret = SPI_execute(buf.data, false, 0);
     151             : 
     152           2 :         if (ret != SPI_OK_UTILITY)
     153           0 :             elog(FATAL, "failed to create my schema");
     154             :     }
     155             : 
     156           2 :     SPI_finish();
     157           2 :     PopActiveSnapshot();
     158           2 :     CommitTransactionCommand();
     159           2 :     pgstat_report_activity(STATE_IDLE, NULL);
     160           2 : }
     161             : 
     162             : void
     163           6 : worker_spi_main(Datum main_arg)
     164             : {
     165           6 :     int         index = DatumGetInt32(main_arg);
     166             :     worktable  *table;
     167             :     StringInfoData buf;
     168             :     char        name[20];
     169             : 
     170           6 :     table = palloc(sizeof(worktable));
     171           6 :     sprintf(name, "schema%d", index);
     172           6 :     table->schema = pstrdup(name);
     173           6 :     table->name = pstrdup("counted");
     174             : 
     175             :     /* Establish signal handlers before unblocking signals. */
     176           6 :     pqsignal(SIGHUP, worker_spi_sighup);
     177           6 :     pqsignal(SIGTERM, worker_spi_sigterm);
     178             : 
     179             :     /* We're now ready to receive signals */
     180           6 :     BackgroundWorkerUnblockSignals();
     181             : 
     182             :     /* Connect to our database */
     183           6 :     BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
     184             : 
     185           2 :     elog(LOG, "%s initialized with %s.%s",
     186             :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     187           2 :     initialize_worker_spi(table);
     188             : 
     189             :     /*
     190             :      * Quote identifiers passed to us.  Note that this must be done after
     191             :      * initialize_worker_spi, because that routine assumes the names are not
     192             :      * quoted.
     193             :      *
     194             :      * Note some memory might be leaked here.
     195             :      */
     196           2 :     table->schema = quote_identifier(table->schema);
     197           2 :     table->name = quote_identifier(table->name);
     198             : 
     199           2 :     initStringInfo(&buf);
     200           2 :     appendStringInfo(&buf,
     201             :                      "WITH deleted AS (DELETE "
     202             :                      "FROM %s.%s "
     203             :                      "WHERE type = 'delta' RETURNING value), "
     204             :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     205             :                      "FROM deleted) "
     206             :                      "UPDATE %s.%s "
     207             :                      "SET value = %s.value + total.sum "
     208             :                      "FROM total WHERE type = 'total' "
     209             :                      "RETURNING %s.value",
     210             :                      table->schema, table->name,
     211             :                      table->schema, table->name,
     212             :                      table->name,
     213             :                      table->name);
     214             : 
     215             :     /*
     216             :      * Main loop: do this until the SIGTERM handler tells us to terminate
     217             :      */
     218          10 :     while (!got_sigterm)
     219             :     {
     220             :         int         ret;
     221             : 
     222             :         /*
     223             :          * Background workers mustn't call usleep() or any direct equivalent:
     224             :          * instead, they may wait on their process latch, which sleeps as
     225             :          * necessary, but is awakened if postmaster dies.  That way the
     226             :          * background process goes away immediately in an emergency.
     227             :          */
     228           6 :         (void) WaitLatch(MyLatch,
     229             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     230             :                          worker_spi_naptime * 1000L,
     231             :                          PG_WAIT_EXTENSION);
     232           6 :         ResetLatch(MyLatch);
     233             : 
     234           6 :         CHECK_FOR_INTERRUPTS();
     235             : 
     236             :         /*
     237             :          * In case of a SIGHUP, just reload the configuration.
     238             :          */
     239           6 :         if (got_sighup)
     240             :         {
     241           2 :             got_sighup = false;
     242           2 :             ProcessConfigFile(PGC_SIGHUP);
     243             :         }
     244             : 
     245             :         /*
     246             :          * Start a transaction on which we can run queries.  Note that each
     247             :          * StartTransactionCommand() call should be preceded by a
     248             :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     249             :          * for the statement we're about the run, and also the transaction
     250             :          * start time.  Also, each other query sent to SPI should probably be
     251             :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     252             :          * start time is always up to date.
     253             :          *
     254             :          * The SPI_connect() call lets us run queries through the SPI manager,
     255             :          * and the PushActiveSnapshot() call creates an "active" snapshot
     256             :          * which is necessary for queries to have MVCC data to work on.
     257             :          *
     258             :          * The pgstat_report_activity() call makes our activity visible
     259             :          * through the pgstat views.
     260             :          */
     261           6 :         SetCurrentStatementStartTimestamp();
     262           6 :         StartTransactionCommand();
     263           6 :         SPI_connect();
     264           6 :         PushActiveSnapshot(GetTransactionSnapshot());
     265           6 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     266             : 
     267             :         /* We can now execute queries via SPI */
     268           6 :         ret = SPI_execute(buf.data, false, 0);
     269             : 
     270           6 :         if (ret != SPI_OK_UPDATE_RETURNING)
     271           0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     272             :                  table->schema, table->name, ret);
     273             : 
     274           6 :         if (SPI_processed > 0)
     275             :         {
     276             :             bool        isnull;
     277             :             int32       val;
     278             : 
     279           4 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     280             :                                               SPI_tuptable->tupdesc,
     281             :                                               1, &isnull));
     282           4 :             if (!isnull)
     283           4 :                 elog(LOG, "%s: count in %s.%s is now %d",
     284             :                      MyBgworkerEntry->bgw_name,
     285             :                      table->schema, table->name, val);
     286             :         }
     287             : 
     288             :         /*
     289             :          * And finish our transaction.
     290             :          */
     291           6 :         SPI_finish();
     292           6 :         PopActiveSnapshot();
     293           6 :         CommitTransactionCommand();
     294           6 :         pgstat_report_stat(false);
     295           6 :         pgstat_report_activity(STATE_IDLE, NULL);
     296             :     }
     297             : 
     298           2 :     proc_exit(1);
     299             : }
     300             : 
     301             : /*
     302             :  * Entrypoint of this module.
     303             :  *
     304             :  * We register more than one worker process here, to demonstrate how that can
     305             :  * be done.
     306             :  */
     307             : void
     308           2 : _PG_init(void)
     309             : {
     310             :     BackgroundWorker worker;
     311             :     unsigned int i;
     312             : 
     313             :     /* get the configuration */
     314           2 :     DefineCustomIntVariable("worker_spi.naptime",
     315             :                             "Duration between each check (in seconds).",
     316             :                             NULL,
     317             :                             &worker_spi_naptime,
     318             :                             10,
     319             :                             1,
     320             :                             INT_MAX,
     321             :                             PGC_SIGHUP,
     322             :                             0,
     323             :                             NULL,
     324             :                             NULL,
     325             :                             NULL);
     326             : 
     327           2 :     if (!process_shared_preload_libraries_in_progress)
     328           0 :         return;
     329             : 
     330           2 :     DefineCustomIntVariable("worker_spi.total_workers",
     331             :                             "Number of workers.",
     332             :                             NULL,
     333             :                             &worker_spi_total_workers,
     334             :                             2,
     335             :                             1,
     336             :                             100,
     337             :                             PGC_POSTMASTER,
     338             :                             0,
     339             :                             NULL,
     340             :                             NULL,
     341             :                             NULL);
     342             : 
     343           2 :     DefineCustomStringVariable("worker_spi.database",
     344             :                                "Database to connect to.",
     345             :                                NULL,
     346             :                                &worker_spi_database,
     347             :                                "postgres",
     348             :                                PGC_POSTMASTER,
     349             :                                0,
     350             :                                NULL, NULL, NULL);
     351             : 
     352             :     /* set up common data for all our workers */
     353           2 :     memset(&worker, 0, sizeof(worker));
     354           2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     355             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     356           2 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     357           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     358           2 :     sprintf(worker.bgw_library_name, "worker_spi");
     359           2 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     360           2 :     worker.bgw_notify_pid = 0;
     361             : 
     362             :     /*
     363             :      * Now fill in worker-specific data, and do the actual registrations.
     364             :      */
     365           6 :     for (i = 1; i <= worker_spi_total_workers; i++)
     366             :     {
     367           4 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     368           4 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     369           4 :         worker.bgw_main_arg = Int32GetDatum(i);
     370             : 
     371           4 :         RegisterBackgroundWorker(&worker);
     372             :     }
     373             : }
     374             : 
     375             : /*
     376             :  * Dynamically launch an SPI worker.
     377             :  */
     378             : Datum
     379           2 : worker_spi_launch(PG_FUNCTION_ARGS)
     380             : {
     381           2 :     int32       i = PG_GETARG_INT32(0);
     382             :     BackgroundWorker worker;
     383             :     BackgroundWorkerHandle *handle;
     384             :     BgwHandleStatus status;
     385             :     pid_t       pid;
     386             : 
     387           2 :     memset(&worker, 0, sizeof(worker));
     388           2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     389             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     390           2 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     391           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     392           2 :     sprintf(worker.bgw_library_name, "worker_spi");
     393           2 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     394           2 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     395           2 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     396           2 :     worker.bgw_main_arg = Int32GetDatum(i);
     397             :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     398           2 :     worker.bgw_notify_pid = MyProcPid;
     399             : 
     400           2 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     401           0 :         PG_RETURN_NULL();
     402             : 
     403           2 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     404             : 
     405           2 :     if (status == BGWH_STOPPED)
     406           0 :         ereport(ERROR,
     407             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     408             :                  errmsg("could not start background process"),
     409             :                  errhint("More details may be available in the server log.")));
     410           2 :     if (status == BGWH_POSTMASTER_DIED)
     411           0 :         ereport(ERROR,
     412             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     413             :                  errmsg("cannot start background processes without postmaster"),
     414             :                  errhint("Kill all remaining database processes and restart the database.")));
     415             :     Assert(status == BGWH_STARTED);
     416             : 
     417           2 :     PG_RETURN_INT32(pid);
     418             : }

Generated by: LCOV version 1.13