LCOV - code coverage report
Current view: top level - contrib/pg_prewarm - autoprewarm.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 75.3 % 312 235
Test Date: 2026-03-25 22:15:52 Functions: 93.8 % 16 15
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * autoprewarm.c
       4              :  *      Periodically dump information about the blocks present in
       5              :  *      shared_buffers, and reload them on server restart.
       6              :  *
       7              :  *      Due to locking considerations, we can't actually begin prewarming
       8              :  *      until the server reaches a consistent state.  We need the catalogs
       9              :  *      to be consistent so that we can figure out which relation to lock,
      10              :  *      and we need to lock the relations so that we don't try to prewarm
      11              :  *      pages from a relation that is in the process of being dropped.
      12              :  *
      13              :  *      While prewarming, autoprewarm will use two workers.  There's a
      14              :  *      leader worker that reads and sorts the list of blocks to be
      15              :  *      prewarmed and then launches a per-database worker for each
      16              :  *      relevant database in turn.  The former keeps running after the
      17              :  *      initial prewarm is complete to update the dump file periodically.
      18              :  *
      19              :  *  Copyright (c) 2016-2026, PostgreSQL Global Development Group
      20              :  *
      21              :  *  IDENTIFICATION
      22              :  *      contrib/pg_prewarm/autoprewarm.c
      23              :  *
      24              :  *-------------------------------------------------------------------------
      25              :  */
      26              : 
      27              : #include "postgres.h"
      28              : 
      29              : #include <unistd.h>
      30              : 
      31              : #include "access/relation.h"
      32              : #include "access/xact.h"
      33              : #include "pgstat.h"
      34              : #include "postmaster/bgworker.h"
      35              : #include "postmaster/interrupt.h"
      36              : #include "storage/buf_internals.h"
      37              : #include "storage/dsm.h"
      38              : #include "storage/dsm_registry.h"
      39              : #include "storage/fd.h"
      40              : #include "storage/ipc.h"
      41              : #include "storage/latch.h"
      42              : #include "storage/lwlock.h"
      43              : #include "storage/procsignal.h"
      44              : #include "storage/read_stream.h"
      45              : #include "storage/smgr.h"
      46              : #include "tcop/tcopprot.h"
      47              : #include "utils/guc.h"
      48              : #include "utils/rel.h"
      49              : #include "utils/relfilenumbermap.h"
      50              : #include "utils/timestamp.h"
      51              : #include "utils/wait_event.h"
      52              : 
      53              : #define AUTOPREWARM_FILE "autoprewarm.blocks"
      54              : 
      55              : /* Metadata for each block we dump. */
      56              : typedef struct BlockInfoRecord
      57              : {
      58              :     Oid         database;
      59              :     Oid         tablespace;
      60              :     RelFileNumber filenumber;
      61              :     ForkNumber  forknum;
      62              :     BlockNumber blocknum;
      63              : } BlockInfoRecord;
      64              : 
      65              : /* Shared state information for autoprewarm bgworker. */
      66              : typedef struct AutoPrewarmSharedState
      67              : {
      68              :     LWLock      lock;           /* mutual exclusion */
      69              :     pid_t       bgworker_pid;   /* for main bgworker */
      70              :     pid_t       pid_using_dumpfile; /* for autoprewarm or block dump */
      71              : 
      72              :     /* Following items are for communication with per-database worker */
      73              :     dsm_handle  block_info_handle;
      74              :     Oid         database;
      75              :     int         prewarm_start_idx;
      76              :     int         prewarm_stop_idx;
      77              :     int         prewarmed_blocks;
      78              : } AutoPrewarmSharedState;
      79              : 
      80              : /*
      81              :  * Private data passed through the read stream API for our use in the
      82              :  * callback.
      83              :  */
      84              : typedef struct AutoPrewarmReadStreamData
      85              : {
      86              :     /* The array of records containing the blocks we should prewarm. */
      87              :     BlockInfoRecord *block_info;
      88              : 
      89              :     /*
      90              :      * pos is the read stream callback's index into block_info. Because the
      91              :      * read stream may read ahead, pos is likely to be ahead of the index in
      92              :      * the main loop in autoprewarm_database_main().
      93              :      */
      94              :     int         pos;
      95              :     Oid         tablespace;
      96              :     RelFileNumber filenumber;
      97              :     ForkNumber  forknum;
      98              :     BlockNumber nblocks;
      99              : } AutoPrewarmReadStreamData;
     100              : 
     101              : 
     102              : PGDLLEXPORT void autoprewarm_main(Datum main_arg);
     103              : PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
     104              : 
     105            3 : PG_FUNCTION_INFO_V1(autoprewarm_start_worker);
     106            4 : PG_FUNCTION_INFO_V1(autoprewarm_dump_now);
     107              : 
     108              : static void apw_load_buffers(void);
     109              : static int  apw_dump_now(bool is_bgworker, bool dump_unlogged);
     110              : static void apw_start_leader_worker(void);
     111              : static void apw_start_database_worker(void);
     112              : static bool apw_init_shmem(void);
     113              : static void apw_detach_shmem(int code, Datum arg);
     114              : static int  apw_compare_blockinfo(const void *p, const void *q);
     115              : 
     116              : /* Pointer to shared-memory state. */
     117              : static AutoPrewarmSharedState *apw_state = NULL;
     118              : 
     119              : /* GUC variables. */
     120              : static bool autoprewarm = true; /* start worker? */
     121              : static int  autoprewarm_interval = 300; /* dump interval */
     122              : 
     123              : /*
     124              :  * Module load callback.
     125              :  */
     126              : void
     127            6 : _PG_init(void)
     128              : {
     129            6 :     DefineCustomIntVariable("pg_prewarm.autoprewarm_interval",
     130              :                             "Sets the interval between dumps of shared buffers",
     131              :                             "If set to zero, time-based dumping is disabled.",
     132              :                             &autoprewarm_interval,
     133              :                             300,
     134              :                             0, INT_MAX / 1000,
     135              :                             PGC_SIGHUP,
     136              :                             GUC_UNIT_S,
     137              :                             NULL,
     138              :                             NULL,
     139              :                             NULL);
     140              : 
     141            6 :     if (!process_shared_preload_libraries_in_progress)
     142            4 :         return;
     143              : 
     144              :     /* can't define PGC_POSTMASTER variable after startup */
     145            2 :     DefineCustomBoolVariable("pg_prewarm.autoprewarm",
     146              :                              "Starts the autoprewarm worker.",
     147              :                              NULL,
     148              :                              &autoprewarm,
     149              :                              true,
     150              :                              PGC_POSTMASTER,
     151              :                              0,
     152              :                              NULL,
     153              :                              NULL,
     154              :                              NULL);
     155              : 
     156            2 :     MarkGUCPrefixReserved("pg_prewarm");
     157              : 
     158              :     /* Register autoprewarm worker, if enabled. */
     159            2 :     if (autoprewarm)
     160            2 :         apw_start_leader_worker();
     161              : }
     162              : 
     163              : /*
     164              :  * Main entry point for the leader autoprewarm process.  Per-database workers
     165              :  * have a separate entry point.
     166              :  */
     167              : void
     168            2 : autoprewarm_main(Datum main_arg)
     169              : {
     170            2 :     bool        first_time = true;
     171            2 :     bool        final_dump_allowed = true;
     172            2 :     TimestampTz last_dump_time = 0;
     173              : 
     174              :     /* Establish signal handlers; once that's done, unblock signals. */
     175            2 :     pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
     176            2 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     177            2 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     178            2 :     BackgroundWorkerUnblockSignals();
     179              : 
     180              :     /* Create (if necessary) and attach to our shared memory area. */
     181            2 :     if (apw_init_shmem())
     182            0 :         first_time = false;
     183              : 
     184              :     /*
     185              :      * Set on-detach hook so that our PID will be cleared on exit.
     186              :      *
     187              :      * NB: Autoprewarm's state is stored in a DSM segment, and DSM segments
     188              :      * are detached before calling the on_shmem_exit callbacks, so we must put
     189              :      * apw_detach_shmem in the before_shmem_exit callback list.
     190              :      */
     191            2 :     before_shmem_exit(apw_detach_shmem, 0);
     192              : 
     193              :     /*
     194              :      * Store our PID in the shared memory area --- unless there's already
     195              :      * another worker running, in which case just exit.
     196              :      */
     197            2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     198            2 :     if (apw_state->bgworker_pid != InvalidPid)
     199              :     {
     200            0 :         LWLockRelease(&apw_state->lock);
     201            0 :         ereport(LOG,
     202              :                 (errmsg("autoprewarm worker is already running under PID %d",
     203              :                         (int) apw_state->bgworker_pid)));
     204            0 :         return;
     205              :     }
     206            2 :     apw_state->bgworker_pid = MyProcPid;
     207            2 :     LWLockRelease(&apw_state->lock);
     208              : 
     209              :     /*
     210              :      * Preload buffers from the dump file only if we just created the shared
     211              :      * memory region.  Otherwise, it's either already been done or shouldn't
     212              :      * be done - e.g. because the old dump file has been overwritten since the
     213              :      * server was started.
     214              :      *
     215              :      * There's not much point in performing a dump immediately after we finish
     216              :      * preloading; so, if we do end up preloading, consider the last dump time
     217              :      * to be equal to the current time.
     218              :      *
     219              :      * If apw_load_buffers() is terminated early by a shutdown request,
     220              :      * prevent dumping out our state below the loop, because we'd effectively
     221              :      * just truncate the saved state to however much we'd managed to preload.
     222              :      */
     223            2 :     if (first_time)
     224              :     {
     225            2 :         apw_load_buffers();
     226            2 :         final_dump_allowed = !ShutdownRequestPending;
     227            2 :         last_dump_time = GetCurrentTimestamp();
     228              :     }
     229              : 
     230              :     /* Periodically dump buffers until terminated. */
     231            5 :     while (!ShutdownRequestPending)
     232              :     {
     233              :         /* In case of a SIGHUP, just reload the configuration. */
     234            3 :         if (ConfigReloadPending)
     235              :         {
     236            0 :             ConfigReloadPending = false;
     237            0 :             ProcessConfigFile(PGC_SIGHUP);
     238              :         }
     239              : 
     240            3 :         if (autoprewarm_interval <= 0)
     241              :         {
     242              :             /* We're only dumping at shutdown, so just wait forever. */
     243            3 :             (void) WaitLatch(MyLatch,
     244              :                              WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     245              :                              -1L,
     246              :                              PG_WAIT_EXTENSION);
     247              :         }
     248              :         else
     249              :         {
     250              :             TimestampTz next_dump_time;
     251              :             long        delay_in_ms;
     252              : 
     253              :             /* Compute the next dump time. */
     254            0 :             next_dump_time =
     255            0 :                 TimestampTzPlusMilliseconds(last_dump_time,
     256              :                                             autoprewarm_interval * 1000);
     257              :             delay_in_ms =
     258            0 :                 TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
     259              :                                                 next_dump_time);
     260              : 
     261              :             /* Perform a dump if it's time. */
     262            0 :             if (delay_in_ms <= 0)
     263              :             {
     264            0 :                 last_dump_time = GetCurrentTimestamp();
     265            0 :                 apw_dump_now(true, false);
     266            0 :                 continue;
     267              :             }
     268              : 
     269              :             /* Sleep until the next dump time. */
     270            0 :             (void) WaitLatch(MyLatch,
     271              :                              WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     272              :                              delay_in_ms,
     273              :                              PG_WAIT_EXTENSION);
     274              :         }
     275              : 
     276              :         /* Reset the latch, loop. */
     277            3 :         ResetLatch(MyLatch);
     278              :     }
     279              : 
     280              :     /*
     281              :      * Dump one last time.  We assume this is probably the result of a system
     282              :      * shutdown, although it's possible that we've merely been terminated.
     283              :      */
     284            2 :     if (final_dump_allowed)
     285            2 :         apw_dump_now(true, true);
     286              : }
     287              : 
     288              : /*
     289              :  * Read the dump file and launch per-database workers one at a time to
     290              :  * prewarm the buffers found there.
     291              :  */
     292              : static void
     293            2 : apw_load_buffers(void)
     294              : {
     295            2 :     FILE       *file = NULL;
     296              :     int         num_elements,
     297              :                 i;
     298              :     BlockInfoRecord *blkinfo;
     299              :     dsm_segment *seg;
     300              : 
     301              :     /*
     302              :      * Skip the prewarm if the dump file is in use; otherwise, prevent any
     303              :      * other process from writing it while we're using it.
     304              :      */
     305            2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     306            2 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     307            2 :         apw_state->pid_using_dumpfile = MyProcPid;
     308              :     else
     309              :     {
     310            0 :         LWLockRelease(&apw_state->lock);
     311            0 :         ereport(LOG,
     312              :                 (errmsg("skipping prewarm because block dump file is being written by PID %d",
     313              :                         (int) apw_state->pid_using_dumpfile)));
     314            1 :         return;
     315              :     }
     316            2 :     LWLockRelease(&apw_state->lock);
     317              : 
     318              :     /*
     319              :      * Open the block dump file.  Exit quietly if it doesn't exist, but report
     320              :      * any other error.
     321              :      */
     322            2 :     file = AllocateFile(AUTOPREWARM_FILE, "r");
     323            2 :     if (!file)
     324              :     {
     325            1 :         if (errno == ENOENT)
     326              :         {
     327            1 :             LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     328            1 :             apw_state->pid_using_dumpfile = InvalidPid;
     329            1 :             LWLockRelease(&apw_state->lock);
     330            1 :             return;             /* No file to load. */
     331              :         }
     332            0 :         ereport(ERROR,
     333              :                 (errcode_for_file_access(),
     334              :                  errmsg("could not read file \"%s\": %m",
     335              :                         AUTOPREWARM_FILE)));
     336              :     }
     337              : 
     338              :     /* First line of the file is a record count. */
     339            1 :     if (fscanf(file, "<<%d>>\n", &num_elements) != 1)
     340            0 :         ereport(ERROR,
     341              :                 (errcode_for_file_access(),
     342              :                  errmsg("could not read from file \"%s\": %m",
     343              :                         AUTOPREWARM_FILE)));
     344              : 
     345              :     /* Allocate a dynamic shared memory segment to store the record data. */
     346            1 :     seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0);
     347            1 :     blkinfo = (BlockInfoRecord *) dsm_segment_address(seg);
     348              : 
     349              :     /* Read records, one per line. */
     350          238 :     for (i = 0; i < num_elements; i++)
     351              :     {
     352              :         unsigned    forknum;
     353              : 
     354          237 :         if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database,
     355          237 :                    &blkinfo[i].tablespace, &blkinfo[i].filenumber,
     356          237 :                    &forknum, &blkinfo[i].blocknum) != 5)
     357            0 :             ereport(ERROR,
     358              :                     (errmsg("autoprewarm block dump file is corrupted at line %d",
     359              :                             i + 1)));
     360          237 :         blkinfo[i].forknum = forknum;
     361              :     }
     362              : 
     363            1 :     FreeFile(file);
     364              : 
     365              :     /* Sort the blocks to be loaded. */
     366            1 :     qsort(blkinfo, num_elements, sizeof(BlockInfoRecord),
     367              :           apw_compare_blockinfo);
     368              : 
     369              :     /* Populate shared memory state. */
     370            1 :     apw_state->block_info_handle = dsm_segment_handle(seg);
     371            1 :     apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0;
     372            1 :     apw_state->prewarmed_blocks = 0;
     373              : 
     374              :     /* Don't prewarm more than we can fit. */
     375            1 :     if (num_elements > NBuffers)
     376              :     {
     377            0 :         num_elements = NBuffers;
     378            0 :         ereport(LOG,
     379              :                 (errmsg("autoprewarm capping prewarmed blocks to %d (shared_buffers size)",
     380              :                         NBuffers)));
     381              :     }
     382              : 
     383              :     /* Get the info position of the first block of the next database. */
     384            2 :     while (apw_state->prewarm_start_idx < num_elements)
     385              :     {
     386            1 :         int         j = apw_state->prewarm_start_idx;
     387            1 :         Oid         current_db = blkinfo[j].database;
     388              : 
     389              :         /*
     390              :          * Advance the prewarm_stop_idx to the first BlockInfoRecord that does
     391              :          * not belong to this database.
     392              :          */
     393            1 :         j++;
     394          237 :         while (j < num_elements)
     395              :         {
     396          236 :             if (current_db != blkinfo[j].database)
     397              :             {
     398              :                 /*
     399              :                  * Combine BlockInfoRecords for global objects with those of
     400              :                  * the database.
     401              :                  */
     402            1 :                 if (current_db != InvalidOid)
     403            0 :                     break;
     404            1 :                 current_db = blkinfo[j].database;
     405              :             }
     406              : 
     407          236 :             j++;
     408              :         }
     409              : 
     410              :         /*
     411              :          * If we reach this point with current_db == InvalidOid, then only
     412              :          * BlockInfoRecords belonging to global objects exist.  We can't
     413              :          * prewarm without a database connection, so just bail out.
     414              :          */
     415            1 :         if (current_db == InvalidOid)
     416            0 :             break;
     417              : 
     418              :         /* Configure stop point and database for next per-database worker. */
     419            1 :         apw_state->prewarm_stop_idx = j;
     420            1 :         apw_state->database = current_db;
     421              :         Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx);
     422              : 
     423              :         /*
     424              :          * Likewise, don't launch if we've already been told to shut down.
     425              :          * (The launch would fail anyway, but we might as well skip it.)
     426              :          */
     427            1 :         if (ShutdownRequestPending)
     428            0 :             break;
     429              : 
     430              :         /*
     431              :          * Start a per-database worker to load blocks for this database; this
     432              :          * function will return once the per-database worker exits.
     433              :          */
     434            1 :         apw_start_database_worker();
     435              : 
     436              :         /* Prepare for next database. */
     437            1 :         apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx;
     438              :     }
     439              : 
     440              :     /* Clean up. */
     441            1 :     dsm_detach(seg);
     442            1 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     443            1 :     apw_state->block_info_handle = DSM_HANDLE_INVALID;
     444            1 :     apw_state->pid_using_dumpfile = InvalidPid;
     445            1 :     LWLockRelease(&apw_state->lock);
     446              : 
     447              :     /* Report our success, if we were able to finish. */
     448            1 :     if (!ShutdownRequestPending)
     449            1 :         ereport(LOG,
     450              :                 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
     451              :                         apw_state->prewarmed_blocks, num_elements)));
     452              : }
     453              : 
     454              : /*
     455              :  * Return the next block number of a specific relation and fork to read
     456              :  * according to the array of BlockInfoRecord.
     457              :  */
     458              : static BlockNumber
     459          312 : apw_read_stream_next_block(ReadStream *stream,
     460              :                            void *callback_private_data,
     461              :                            void *per_buffer_data)
     462              : {
     463          312 :     AutoPrewarmReadStreamData *p = callback_private_data;
     464              : 
     465          312 :     CHECK_FOR_INTERRUPTS();
     466              : 
     467          312 :     while (p->pos < apw_state->prewarm_stop_idx)
     468              :     {
     469          311 :         BlockInfoRecord blk = p->block_info[p->pos];
     470              : 
     471          311 :         if (blk.tablespace != p->tablespace)
     472          311 :             return InvalidBlockNumber;
     473              : 
     474          310 :         if (blk.filenumber != p->filenumber)
     475           55 :             return InvalidBlockNumber;
     476              : 
     477          255 :         if (blk.forknum != p->forknum)
     478           18 :             return InvalidBlockNumber;
     479              : 
     480          237 :         p->pos++;
     481              : 
     482              :         /*
     483              :          * Check whether blocknum is valid and within fork file size.
     484              :          * Fast-forward through any invalid blocks. We want p->pos to reflect
     485              :          * the location of the next relation or fork before ending the stream.
     486              :          */
     487          237 :         if (blk.blocknum >= p->nblocks)
     488            0 :             continue;
     489              : 
     490          237 :         return blk.blocknum;
     491              :     }
     492              : 
     493            1 :     return InvalidBlockNumber;
     494              : }
     495              : 
     496              : /*
     497              :  * Prewarm all blocks for one database (and possibly also global objects, if
     498              :  * those got grouped with this database).
     499              :  */
     500              : void
     501            1 : autoprewarm_database_main(Datum main_arg)
     502              : {
     503              :     BlockInfoRecord *block_info;
     504              :     int         i;
     505              :     BlockInfoRecord blk;
     506              :     dsm_segment *seg;
     507              : 
     508              :     /* Establish signal handlers; once that's done, unblock signals. */
     509            1 :     pqsignal(SIGTERM, die);
     510            1 :     BackgroundWorkerUnblockSignals();
     511              : 
     512              :     /* Connect to correct database and get block information. */
     513            1 :     apw_init_shmem();
     514            1 :     seg = dsm_attach(apw_state->block_info_handle);
     515            1 :     if (seg == NULL)
     516            0 :         ereport(ERROR,
     517              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     518              :                  errmsg("could not map dynamic shared memory segment")));
     519            1 :     BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
     520            1 :     block_info = (BlockInfoRecord *) dsm_segment_address(seg);
     521              : 
     522            1 :     i = apw_state->prewarm_start_idx;
     523            1 :     blk = block_info[i];
     524              : 
     525              :     /*
     526              :      * Loop until we run out of blocks to prewarm or until we run out of
     527              :      * buffers.
     528              :      */
     529           58 :     while (i < apw_state->prewarm_stop_idx)
     530              :     {
     531           57 :         Oid         tablespace = blk.tablespace;
     532           57 :         RelFileNumber filenumber = blk.filenumber;
     533              :         Oid         reloid;
     534              :         Relation    rel;
     535              : 
     536              :         /*
     537              :          * All blocks between prewarm_start_idx and prewarm_stop_idx should
     538              :          * belong either to global objects or the same database.
     539              :          */
     540              :         Assert(blk.database == apw_state->database || blk.database == 0);
     541              : 
     542           57 :         StartTransactionCommand();
     543              : 
     544           57 :         reloid = RelidByRelfilenumber(blk.tablespace, blk.filenumber);
     545          114 :         if (!OidIsValid(reloid) ||
     546           57 :             (rel = try_relation_open(reloid, AccessShareLock)) == NULL)
     547              :         {
     548              :             /* We failed to open the relation, so there is nothing to close. */
     549            0 :             CommitTransactionCommand();
     550              : 
     551              :             /*
     552              :              * Fast-forward to the next relation. We want to skip all of the
     553              :              * other records referencing this relation since we know we can't
     554              :              * open it. That way, we avoid repeatedly trying and failing to
     555              :              * open the same relation.
     556              :              */
     557            0 :             for (; i < apw_state->prewarm_stop_idx; i++)
     558              :             {
     559            0 :                 blk = block_info[i];
     560            0 :                 if (blk.tablespace != tablespace ||
     561            0 :                     blk.filenumber != filenumber)
     562              :                     break;
     563              :             }
     564              : 
     565              :             /* Time to try and open our newfound relation */
     566            0 :             continue;
     567              :         }
     568              : 
     569              :         /*
     570              :          * We have a relation; now let's loop until we find a valid fork of
     571              :          * the relation or we run out of buffers. Once we've read from all
     572              :          * valid forks or run out of options, we'll close the relation and
     573              :          * move on.
     574              :          */
     575           57 :         while (i < apw_state->prewarm_stop_idx &&
     576          132 :                blk.tablespace == tablespace &&
     577          130 :                blk.filenumber == filenumber)
     578              :         {
     579           75 :             ForkNumber  forknum = blk.forknum;
     580              :             BlockNumber nblocks;
     581              :             struct AutoPrewarmReadStreamData p;
     582              :             ReadStream *stream;
     583              :             Buffer      buf;
     584              : 
     585              :             /*
     586              :              * smgrexists is not safe for illegal forknum, hence check whether
     587              :              * the passed forknum is valid before using it in smgrexists.
     588              :              */
     589           75 :             if (blk.forknum <= InvalidForkNumber ||
     590           75 :                 blk.forknum > MAX_FORKNUM ||
     591           75 :                 !smgrexists(RelationGetSmgr(rel), blk.forknum))
     592              :             {
     593              :                 /*
     594              :                  * Fast-forward to the next fork. We want to skip all of the
     595              :                  * other records referencing this fork since we already know
     596              :                  * it's not valid.
     597              :                  */
     598            0 :                 for (; i < apw_state->prewarm_stop_idx; i++)
     599              :                 {
     600            0 :                     blk = block_info[i];
     601            0 :                     if (blk.tablespace != tablespace ||
     602            0 :                         blk.filenumber != filenumber ||
     603            0 :                         blk.forknum != forknum)
     604              :                         break;
     605              :                 }
     606              : 
     607              :                 /* Time to check if this newfound fork is valid */
     608            0 :                 continue;
     609              :             }
     610              : 
     611           75 :             nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
     612              : 
     613           75 :             p = (struct AutoPrewarmReadStreamData)
     614              :             {
     615              :                 .block_info = block_info,
     616              :                     .pos = i,
     617              :                     .tablespace = tablespace,
     618              :                     .filenumber = filenumber,
     619              :                     .forknum = forknum,
     620              :                     .nblocks = nblocks,
     621              :             };
     622              : 
     623           75 :             stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
     624              :                                                 READ_STREAM_DEFAULT |
     625              :                                                 READ_STREAM_USE_BATCHING,
     626              :                                                 NULL,
     627              :                                                 rel,
     628              :                                                 p.forknum,
     629              :                                                 apw_read_stream_next_block,
     630              :                                                 &p,
     631              :                                                 0);
     632              : 
     633              :             /*
     634              :              * Loop until we've prewarmed all the blocks from this fork. The
     635              :              * read stream callback will check that we still have free buffers
     636              :              * before requesting each block from the read stream API.
     637              :              */
     638          312 :             while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     639              :             {
     640          237 :                 apw_state->prewarmed_blocks++;
     641          237 :                 ReleaseBuffer(buf);
     642              :             }
     643              : 
     644           75 :             read_stream_end(stream);
     645              : 
     646              :             /* Advance i past all the blocks just prewarmed. */
     647           75 :             i = p.pos;
     648           75 :             blk = block_info[i];
     649              :         }
     650              : 
     651           57 :         relation_close(rel, AccessShareLock);
     652           57 :         CommitTransactionCommand();
     653              :     }
     654              : 
     655            1 :     dsm_detach(seg);
     656            1 : }
     657              : 
     658              : /*
     659              :  * Dump information on blocks in shared buffers.  We use a text format here
     660              :  * so that it's easy to understand and even change the file contents if
     661              :  * necessary.
     662              :  * Returns the number of blocks dumped.
     663              :  */
     664              : static int
     665            3 : apw_dump_now(bool is_bgworker, bool dump_unlogged)
     666              : {
     667              :     int         num_blocks;
     668              :     int         i;
     669              :     int         ret;
     670              :     BlockInfoRecord *block_info_array;
     671              :     BufferDesc *bufHdr;
     672              :     FILE       *file;
     673              :     char        transient_dump_file_path[MAXPGPATH];
     674              :     pid_t       pid;
     675              : 
     676            3 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     677            3 :     pid = apw_state->pid_using_dumpfile;
     678            3 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     679            3 :         apw_state->pid_using_dumpfile = MyProcPid;
     680            3 :     LWLockRelease(&apw_state->lock);
     681              : 
     682            3 :     if (pid != InvalidPid)
     683              :     {
     684            0 :         if (!is_bgworker)
     685            0 :             ereport(ERROR,
     686              :                     (errmsg("could not perform block dump because dump file is being used by PID %d",
     687              :                             (int) apw_state->pid_using_dumpfile)));
     688              : 
     689            0 :         ereport(LOG,
     690              :                 (errmsg("skipping block dump because it is already being performed by PID %d",
     691              :                         (int) apw_state->pid_using_dumpfile)));
     692            0 :         return 0;
     693              :     }
     694              : 
     695              :     /*
     696              :      * With sufficiently large shared_buffers, allocation will exceed 1GB, so
     697              :      * allow for a huge allocation to prevent outright failure.
     698              :      *
     699              :      * (In the future, it might be a good idea to redesign this to use a more
     700              :      * memory-efficient data structure.)
     701              :      */
     702              :     block_info_array = (BlockInfoRecord *)
     703            3 :         palloc_extended((sizeof(BlockInfoRecord) * NBuffers), MCXT_ALLOC_HUGE);
     704              : 
     705        49155 :     for (num_blocks = 0, i = 0; i < NBuffers; i++)
     706              :     {
     707              :         uint64      buf_state;
     708              : 
     709        49152 :         CHECK_FOR_INTERRUPTS();
     710              : 
     711        49152 :         bufHdr = GetBufferDescriptor(i);
     712              : 
     713              :         /* Lock each buffer header before inspecting. */
     714        49152 :         buf_state = LockBufHdr(bufHdr);
     715              : 
     716              :         /*
     717              :          * Unlogged tables will be automatically truncated after a crash or
     718              :          * unclean shutdown. In such cases we need not prewarm them. Dump them
     719              :          * only if requested by caller.
     720              :          */
     721        49152 :         if (buf_state & BM_TAG_VALID &&
     722          711 :             ((buf_state & BM_PERMANENT) || dump_unlogged))
     723              :         {
     724          711 :             block_info_array[num_blocks].database = bufHdr->tag.dbOid;
     725          711 :             block_info_array[num_blocks].tablespace = bufHdr->tag.spcOid;
     726         1422 :             block_info_array[num_blocks].filenumber =
     727          711 :                 BufTagGetRelNumber(&bufHdr->tag);
     728         1422 :             block_info_array[num_blocks].forknum =
     729          711 :                 BufTagGetForkNum(&bufHdr->tag);
     730          711 :             block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum;
     731          711 :             ++num_blocks;
     732              :         }
     733              : 
     734        49152 :         UnlockBufHdr(bufHdr);
     735              :     }
     736              : 
     737            3 :     snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE);
     738            3 :     file = AllocateFile(transient_dump_file_path, "w");
     739            3 :     if (!file)
     740            0 :         ereport(ERROR,
     741              :                 (errcode_for_file_access(),
     742              :                  errmsg("could not open file \"%s\": %m",
     743              :                         transient_dump_file_path)));
     744              : 
     745            3 :     ret = fprintf(file, "<<%d>>\n", num_blocks);
     746            3 :     if (ret < 0)
     747              :     {
     748            0 :         int         save_errno = errno;
     749              : 
     750            0 :         FreeFile(file);
     751            0 :         unlink(transient_dump_file_path);
     752            0 :         errno = save_errno;
     753            0 :         ereport(ERROR,
     754              :                 (errcode_for_file_access(),
     755              :                  errmsg("could not write to file \"%s\": %m",
     756              :                         transient_dump_file_path)));
     757              :     }
     758              : 
     759          714 :     for (i = 0; i < num_blocks; i++)
     760              :     {
     761          711 :         CHECK_FOR_INTERRUPTS();
     762              : 
     763          711 :         ret = fprintf(file, "%u,%u,%u,%u,%u\n",
     764          711 :                       block_info_array[i].database,
     765          711 :                       block_info_array[i].tablespace,
     766          711 :                       block_info_array[i].filenumber,
     767          711 :                       (uint32) block_info_array[i].forknum,
     768          711 :                       block_info_array[i].blocknum);
     769          711 :         if (ret < 0)
     770              :         {
     771            0 :             int         save_errno = errno;
     772              : 
     773            0 :             FreeFile(file);
     774            0 :             unlink(transient_dump_file_path);
     775            0 :             errno = save_errno;
     776            0 :             ereport(ERROR,
     777              :                     (errcode_for_file_access(),
     778              :                      errmsg("could not write to file \"%s\": %m",
     779              :                             transient_dump_file_path)));
     780              :         }
     781              :     }
     782              : 
     783            3 :     pfree(block_info_array);
     784              : 
     785              :     /*
     786              :      * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things
     787              :      * permanent.
     788              :      */
     789            3 :     ret = FreeFile(file);
     790            3 :     if (ret != 0)
     791              :     {
     792            0 :         int         save_errno = errno;
     793              : 
     794            0 :         unlink(transient_dump_file_path);
     795            0 :         errno = save_errno;
     796            0 :         ereport(ERROR,
     797              :                 (errcode_for_file_access(),
     798              :                  errmsg("could not close file \"%s\": %m",
     799              :                         transient_dump_file_path)));
     800              :     }
     801              : 
     802            3 :     (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR);
     803            3 :     apw_state->pid_using_dumpfile = InvalidPid;
     804              : 
     805            3 :     ereport(DEBUG1,
     806              :             (errmsg_internal("wrote block details for %d blocks", num_blocks)));
     807            3 :     return num_blocks;
     808              : }
     809              : 
     810              : /*
     811              :  * SQL-callable function to launch autoprewarm.
     812              :  */
     813              : Datum
     814            0 : autoprewarm_start_worker(PG_FUNCTION_ARGS)
     815              : {
     816              :     pid_t       pid;
     817              : 
     818            0 :     if (!autoprewarm)
     819            0 :         ereport(ERROR,
     820              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     821              :                  errmsg("autoprewarm is disabled")));
     822              : 
     823            0 :     apw_init_shmem();
     824            0 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     825            0 :     pid = apw_state->bgworker_pid;
     826            0 :     LWLockRelease(&apw_state->lock);
     827              : 
     828            0 :     if (pid != InvalidPid)
     829            0 :         ereport(ERROR,
     830              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     831              :                  errmsg("autoprewarm worker is already running under PID %d",
     832              :                         (int) pid)));
     833              : 
     834            0 :     apw_start_leader_worker();
     835              : 
     836            0 :     PG_RETURN_VOID();
     837              : }
     838              : 
     839              : /*
     840              :  * SQL-callable function to perform an immediate block dump.
     841              :  *
     842              :  * Note: this is declared to return int8, as insurance against some
     843              :  * very distant day when we might make NBuffers wider than int.
     844              :  */
     845              : Datum
     846            1 : autoprewarm_dump_now(PG_FUNCTION_ARGS)
     847              : {
     848              :     int         num_blocks;
     849              : 
     850            1 :     apw_init_shmem();
     851              : 
     852            1 :     PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     853              :     {
     854            1 :         num_blocks = apw_dump_now(false, true);
     855              :     }
     856            1 :     PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     857              : 
     858            1 :     PG_RETURN_INT64((int64) num_blocks);
     859              : }
     860              : 
     861              : static void
     862            2 : apw_init_state(void *ptr, void *arg)
     863              : {
     864            2 :     AutoPrewarmSharedState *state = (AutoPrewarmSharedState *) ptr;
     865              : 
     866            2 :     LWLockInitialize(&state->lock, LWLockNewTrancheId("autoprewarm"));
     867            2 :     state->bgworker_pid = InvalidPid;
     868            2 :     state->pid_using_dumpfile = InvalidPid;
     869            2 : }
     870              : 
     871              : /*
     872              :  * Allocate and initialize autoprewarm related shared memory, if not already
     873              :  * done, and set up backend-local pointer to that state.  Returns true if an
     874              :  * existing shared memory segment was found.
     875              :  */
     876              : static bool
     877            4 : apw_init_shmem(void)
     878              : {
     879              :     bool        found;
     880              : 
     881            4 :     apw_state = GetNamedDSMSegment("autoprewarm",
     882              :                                    sizeof(AutoPrewarmSharedState),
     883              :                                    apw_init_state,
     884              :                                    &found, NULL);
     885              : 
     886            4 :     return found;
     887              : }
     888              : 
     889              : /*
     890              :  * Clear our PID from autoprewarm shared state.
     891              :  */
     892              : static void
     893            2 : apw_detach_shmem(int code, Datum arg)
     894              : {
     895            2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     896            2 :     if (apw_state->pid_using_dumpfile == MyProcPid)
     897            0 :         apw_state->pid_using_dumpfile = InvalidPid;
     898            2 :     if (apw_state->bgworker_pid == MyProcPid)
     899            2 :         apw_state->bgworker_pid = InvalidPid;
     900            2 :     LWLockRelease(&apw_state->lock);
     901            2 : }
     902              : 
     903              : /*
     904              :  * Start autoprewarm leader worker process.
     905              :  */
     906              : static void
     907            2 : apw_start_leader_worker(void)
     908              : {
     909            2 :     BackgroundWorker worker = {0};
     910              :     BackgroundWorkerHandle *handle;
     911              :     BgwHandleStatus status;
     912              :     pid_t       pid;
     913              : 
     914            2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     915            2 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     916            2 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     917            2 :     strcpy(worker.bgw_function_name, "autoprewarm_main");
     918            2 :     strcpy(worker.bgw_name, "autoprewarm leader");
     919            2 :     strcpy(worker.bgw_type, "autoprewarm leader");
     920              : 
     921            2 :     if (process_shared_preload_libraries_in_progress)
     922              :     {
     923            2 :         RegisterBackgroundWorker(&worker);
     924            2 :         return;
     925              :     }
     926              : 
     927              :     /* must set notify PID to wait for startup */
     928            0 :     worker.bgw_notify_pid = MyProcPid;
     929              : 
     930            0 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     931            0 :         ereport(ERROR,
     932              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     933              :                  errmsg("could not register background process"),
     934              :                  errhint("You may need to increase \"max_worker_processes\".")));
     935              : 
     936            0 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     937            0 :     if (status != BGWH_STARTED)
     938            0 :         ereport(ERROR,
     939              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     940              :                  errmsg("could not start background process"),
     941              :                  errhint("More details may be available in the server log.")));
     942              : }
     943              : 
     944              : /*
     945              :  * Start autoprewarm per-database worker process.
     946              :  */
     947              : static void
     948            1 : apw_start_database_worker(void)
     949              : {
     950            1 :     BackgroundWorker worker = {0};
     951              :     BackgroundWorkerHandle *handle;
     952              : 
     953            1 :     worker.bgw_flags =
     954              :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
     955            1 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     956            1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     957            1 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     958            1 :     strcpy(worker.bgw_function_name, "autoprewarm_database_main");
     959            1 :     strcpy(worker.bgw_name, "autoprewarm worker");
     960            1 :     strcpy(worker.bgw_type, "autoprewarm worker");
     961              : 
     962              :     /* must set notify PID to wait for shutdown */
     963            1 :     worker.bgw_notify_pid = MyProcPid;
     964              : 
     965            1 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     966            0 :         ereport(ERROR,
     967              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     968              :                  errmsg("registering dynamic bgworker autoprewarm failed"),
     969              :                  errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
     970              : 
     971              :     /*
     972              :      * Ignore return value; if it fails, postmaster has died, but we have
     973              :      * checks for that elsewhere.
     974              :      */
     975            1 :     WaitForBackgroundWorkerShutdown(handle);
     976            1 : }
     977              : 
     978              : /* Compare member elements to check whether they are not equal. */
     979              : #define cmp_member_elem(fld)    \
     980              : do { \
     981              :     if (a->fld < b->fld)       \
     982              :         return -1;              \
     983              :     else if (a->fld > b->fld)  \
     984              :         return 1;               \
     985              : } while(0)
     986              : 
     987              : /*
     988              :  * apw_compare_blockinfo
     989              :  *
     990              :  * We depend on all records for a particular database being consecutive
     991              :  * in the dump file; each per-database worker will preload blocks until
     992              :  * it sees a block for some other database.  Sorting by tablespace,
     993              :  * filenumber, forknum, and blocknum isn't critical for correctness, but
     994              :  * helps us get a sequential I/O pattern.
     995              :  */
     996              : static int
     997         2032 : apw_compare_blockinfo(const void *p, const void *q)
     998              : {
     999         2032 :     const BlockInfoRecord *a = (const BlockInfoRecord *) p;
    1000         2032 :     const BlockInfoRecord *b = (const BlockInfoRecord *) q;
    1001              : 
    1002         2032 :     cmp_member_elem(database);
    1003         1967 :     cmp_member_elem(tablespace);
    1004         1967 :     cmp_member_elem(filenumber);
    1005          719 :     cmp_member_elem(forknum);
    1006          609 :     cmp_member_elem(blocknum);
    1007              : 
    1008            0 :     return 0;
    1009              : }
        

Generated by: LCOV version 2.0-1