LCOV - code coverage report
Current view: top level - contrib/pg_prewarm - autoprewarm.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 75.3 % 312 235
Test Date: 2026-03-03 08:14:49 Functions: 93.8 % 16 15
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * autoprewarm.c
       4              :  *      Periodically dump information about the blocks present in
       5              :  *      shared_buffers, and reload them on server restart.
       6              :  *
       7              :  *      Due to locking considerations, we can't actually begin prewarming
       8              :  *      until the server reaches a consistent state.  We need the catalogs
       9              :  *      to be consistent so that we can figure out which relation to lock,
      10              :  *      and we need to lock the relations so that we don't try to prewarm
      11              :  *      pages from a relation that is in the process of being dropped.
      12              :  *
      13              :  *      While prewarming, autoprewarm will use two workers.  There's a
      14              :  *      leader worker that reads and sorts the list of blocks to be
      15              :  *      prewarmed and then launches a per-database worker for each
      16              :  *      relevant database in turn.  The former keeps running after the
      17              :  *      initial prewarm is complete to update the dump file periodically.
      18              :  *
      19              :  *  Copyright (c) 2016-2026, PostgreSQL Global Development Group
      20              :  *
      21              :  *  IDENTIFICATION
      22              :  *      contrib/pg_prewarm/autoprewarm.c
      23              :  *
      24              :  *-------------------------------------------------------------------------
      25              :  */
      26              : 
      27              : #include "postgres.h"
      28              : 
      29              : #include <unistd.h>
      30              : 
      31              : #include "access/relation.h"
      32              : #include "access/xact.h"
      33              : #include "pgstat.h"
      34              : #include "postmaster/bgworker.h"
      35              : #include "postmaster/interrupt.h"
      36              : #include "storage/buf_internals.h"
      37              : #include "storage/dsm.h"
      38              : #include "storage/dsm_registry.h"
      39              : #include "storage/fd.h"
      40              : #include "storage/ipc.h"
      41              : #include "storage/latch.h"
      42              : #include "storage/lwlock.h"
      43              : #include "storage/procsignal.h"
      44              : #include "storage/read_stream.h"
      45              : #include "storage/smgr.h"
      46              : #include "tcop/tcopprot.h"
      47              : #include "utils/guc.h"
      48              : #include "utils/rel.h"
      49              : #include "utils/relfilenumbermap.h"
      50              : #include "utils/timestamp.h"
      51              : 
      52              : #define AUTOPREWARM_FILE "autoprewarm.blocks"
      53              : 
      54              : /* Metadata for each block we dump. */
      55              : typedef struct BlockInfoRecord
      56              : {
      57              :     Oid         database;
      58              :     Oid         tablespace;
      59              :     RelFileNumber filenumber;
      60              :     ForkNumber  forknum;
      61              :     BlockNumber blocknum;
      62              : } BlockInfoRecord;
      63              : 
      64              : /* Shared state information for autoprewarm bgworker. */
      65              : typedef struct AutoPrewarmSharedState
      66              : {
      67              :     LWLock      lock;           /* mutual exclusion */
      68              :     pid_t       bgworker_pid;   /* for main bgworker */
      69              :     pid_t       pid_using_dumpfile; /* for autoprewarm or block dump */
      70              : 
      71              :     /* Following items are for communication with per-database worker */
      72              :     dsm_handle  block_info_handle;
      73              :     Oid         database;
      74              :     int         prewarm_start_idx;
      75              :     int         prewarm_stop_idx;
      76              :     int         prewarmed_blocks;
      77              : } AutoPrewarmSharedState;
      78              : 
      79              : /*
      80              :  * Private data passed through the read stream API for our use in the
      81              :  * callback.
      82              :  */
      83              : typedef struct AutoPrewarmReadStreamData
      84              : {
      85              :     /* The array of records containing the blocks we should prewarm. */
      86              :     BlockInfoRecord *block_info;
      87              : 
      88              :     /*
      89              :      * pos is the read stream callback's index into block_info. Because the
      90              :      * read stream may read ahead, pos is likely to be ahead of the index in
      91              :      * the main loop in autoprewarm_database_main().
      92              :      */
      93              :     int         pos;
      94              :     Oid         tablespace;
      95              :     RelFileNumber filenumber;
      96              :     ForkNumber  forknum;
      97              :     BlockNumber nblocks;
      98              : } AutoPrewarmReadStreamData;
      99              : 
     100              : 
     101              : PGDLLEXPORT void autoprewarm_main(Datum main_arg);
     102              : PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
     103              : 
     104            3 : PG_FUNCTION_INFO_V1(autoprewarm_start_worker);
     105            4 : PG_FUNCTION_INFO_V1(autoprewarm_dump_now);
     106              : 
     107              : static void apw_load_buffers(void);
     108              : static int  apw_dump_now(bool is_bgworker, bool dump_unlogged);
     109              : static void apw_start_leader_worker(void);
     110              : static void apw_start_database_worker(void);
     111              : static bool apw_init_shmem(void);
     112              : static void apw_detach_shmem(int code, Datum arg);
     113              : static int  apw_compare_blockinfo(const void *p, const void *q);
     114              : 
     115              : /* Pointer to shared-memory state. */
     116              : static AutoPrewarmSharedState *apw_state = NULL;
     117              : 
     118              : /* GUC variables. */
     119              : static bool autoprewarm = true; /* start worker? */
     120              : static int  autoprewarm_interval = 300; /* dump interval */
     121              : 
     122              : /*
     123              :  * Module load callback.
     124              :  */
     125              : void
     126            6 : _PG_init(void)
     127              : {
     128            6 :     DefineCustomIntVariable("pg_prewarm.autoprewarm_interval",
     129              :                             "Sets the interval between dumps of shared buffers",
     130              :                             "If set to zero, time-based dumping is disabled.",
     131              :                             &autoprewarm_interval,
     132              :                             300,
     133              :                             0, INT_MAX / 1000,
     134              :                             PGC_SIGHUP,
     135              :                             GUC_UNIT_S,
     136              :                             NULL,
     137              :                             NULL,
     138              :                             NULL);
     139              : 
     140            6 :     if (!process_shared_preload_libraries_in_progress)
     141            4 :         return;
     142              : 
     143              :     /* can't define PGC_POSTMASTER variable after startup */
     144            2 :     DefineCustomBoolVariable("pg_prewarm.autoprewarm",
     145              :                              "Starts the autoprewarm worker.",
     146              :                              NULL,
     147              :                              &autoprewarm,
     148              :                              true,
     149              :                              PGC_POSTMASTER,
     150              :                              0,
     151              :                              NULL,
     152              :                              NULL,
     153              :                              NULL);
     154              : 
     155            2 :     MarkGUCPrefixReserved("pg_prewarm");
     156              : 
     157              :     /* Register autoprewarm worker, if enabled. */
     158            2 :     if (autoprewarm)
     159            2 :         apw_start_leader_worker();
     160              : }
     161              : 
     162              : /*
     163              :  * Main entry point for the leader autoprewarm process.  Per-database workers
     164              :  * have a separate entry point.
     165              :  */
     166              : void
     167            2 : autoprewarm_main(Datum main_arg)
     168              : {
     169            2 :     bool        first_time = true;
     170            2 :     bool        final_dump_allowed = true;
     171            2 :     TimestampTz last_dump_time = 0;
     172              : 
     173              :     /* Establish signal handlers; once that's done, unblock signals. */
     174            2 :     pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
     175            2 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     176            2 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     177            2 :     BackgroundWorkerUnblockSignals();
     178              : 
     179              :     /* Create (if necessary) and attach to our shared memory area. */
     180            2 :     if (apw_init_shmem())
     181            0 :         first_time = false;
     182              : 
     183              :     /*
     184              :      * Set on-detach hook so that our PID will be cleared on exit.
     185              :      *
     186              :      * NB: Autoprewarm's state is stored in a DSM segment, and DSM segments
     187              :      * are detached before calling the on_shmem_exit callbacks, so we must put
     188              :      * apw_detach_shmem in the before_shmem_exit callback list.
     189              :      */
     190            2 :     before_shmem_exit(apw_detach_shmem, 0);
     191              : 
     192              :     /*
     193              :      * Store our PID in the shared memory area --- unless there's already
     194              :      * another worker running, in which case just exit.
     195              :      */
     196            2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     197            2 :     if (apw_state->bgworker_pid != InvalidPid)
     198              :     {
     199            0 :         LWLockRelease(&apw_state->lock);
     200            0 :         ereport(LOG,
     201              :                 (errmsg("autoprewarm worker is already running under PID %d",
     202              :                         (int) apw_state->bgworker_pid)));
     203            0 :         return;
     204              :     }
     205            2 :     apw_state->bgworker_pid = MyProcPid;
     206            2 :     LWLockRelease(&apw_state->lock);
     207              : 
     208              :     /*
     209              :      * Preload buffers from the dump file only if we just created the shared
     210              :      * memory region.  Otherwise, it's either already been done or shouldn't
     211              :      * be done - e.g. because the old dump file has been overwritten since the
     212              :      * server was started.
     213              :      *
     214              :      * There's not much point in performing a dump immediately after we finish
     215              :      * preloading; so, if we do end up preloading, consider the last dump time
     216              :      * to be equal to the current time.
     217              :      *
     218              :      * If apw_load_buffers() is terminated early by a shutdown request,
     219              :      * prevent dumping out our state below the loop, because we'd effectively
     220              :      * just truncate the saved state to however much we'd managed to preload.
     221              :      */
     222            2 :     if (first_time)
     223              :     {
     224            2 :         apw_load_buffers();
     225            2 :         final_dump_allowed = !ShutdownRequestPending;
     226            2 :         last_dump_time = GetCurrentTimestamp();
     227              :     }
     228              : 
     229              :     /* Periodically dump buffers until terminated. */
     230            5 :     while (!ShutdownRequestPending)
     231              :     {
     232              :         /* In case of a SIGHUP, just reload the configuration. */
     233            3 :         if (ConfigReloadPending)
     234              :         {
     235            0 :             ConfigReloadPending = false;
     236            0 :             ProcessConfigFile(PGC_SIGHUP);
     237              :         }
     238              : 
     239            3 :         if (autoprewarm_interval <= 0)
     240              :         {
     241              :             /* We're only dumping at shutdown, so just wait forever. */
     242            3 :             (void) WaitLatch(MyLatch,
     243              :                              WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     244              :                              -1L,
     245              :                              PG_WAIT_EXTENSION);
     246              :         }
     247              :         else
     248              :         {
     249              :             TimestampTz next_dump_time;
     250              :             long        delay_in_ms;
     251              : 
     252              :             /* Compute the next dump time. */
     253            0 :             next_dump_time =
     254            0 :                 TimestampTzPlusMilliseconds(last_dump_time,
     255              :                                             autoprewarm_interval * 1000);
     256              :             delay_in_ms =
     257            0 :                 TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
     258              :                                                 next_dump_time);
     259              : 
     260              :             /* Perform a dump if it's time. */
     261            0 :             if (delay_in_ms <= 0)
     262              :             {
     263            0 :                 last_dump_time = GetCurrentTimestamp();
     264            0 :                 apw_dump_now(true, false);
     265            0 :                 continue;
     266              :             }
     267              : 
     268              :             /* Sleep until the next dump time. */
     269            0 :             (void) WaitLatch(MyLatch,
     270              :                              WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     271              :                              delay_in_ms,
     272              :                              PG_WAIT_EXTENSION);
     273              :         }
     274              : 
     275              :         /* Reset the latch, loop. */
     276            3 :         ResetLatch(MyLatch);
     277              :     }
     278              : 
     279              :     /*
     280              :      * Dump one last time.  We assume this is probably the result of a system
     281              :      * shutdown, although it's possible that we've merely been terminated.
     282              :      */
     283            2 :     if (final_dump_allowed)
     284            2 :         apw_dump_now(true, true);
     285              : }
     286              : 
     287              : /*
     288              :  * Read the dump file and launch per-database workers one at a time to
     289              :  * prewarm the buffers found there.
     290              :  */
     291              : static void
     292            2 : apw_load_buffers(void)
     293              : {
     294            2 :     FILE       *file = NULL;
     295              :     int         num_elements,
     296              :                 i;
     297              :     BlockInfoRecord *blkinfo;
     298              :     dsm_segment *seg;
     299              : 
     300              :     /*
     301              :      * Skip the prewarm if the dump file is in use; otherwise, prevent any
     302              :      * other process from writing it while we're using it.
     303              :      */
     304            2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     305            2 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     306            2 :         apw_state->pid_using_dumpfile = MyProcPid;
     307              :     else
     308              :     {
     309            0 :         LWLockRelease(&apw_state->lock);
     310            0 :         ereport(LOG,
     311              :                 (errmsg("skipping prewarm because block dump file is being written by PID %d",
     312              :                         (int) apw_state->pid_using_dumpfile)));
     313            1 :         return;
     314              :     }
     315            2 :     LWLockRelease(&apw_state->lock);
     316              : 
     317              :     /*
     318              :      * Open the block dump file.  Exit quietly if it doesn't exist, but report
     319              :      * any other error.
     320              :      */
     321            2 :     file = AllocateFile(AUTOPREWARM_FILE, "r");
     322            2 :     if (!file)
     323              :     {
     324            1 :         if (errno == ENOENT)
     325              :         {
     326            1 :             LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     327            1 :             apw_state->pid_using_dumpfile = InvalidPid;
     328            1 :             LWLockRelease(&apw_state->lock);
     329            1 :             return;             /* No file to load. */
     330              :         }
     331            0 :         ereport(ERROR,
     332              :                 (errcode_for_file_access(),
     333              :                  errmsg("could not read file \"%s\": %m",
     334              :                         AUTOPREWARM_FILE)));
     335              :     }
     336              : 
     337              :     /* First line of the file is a record count. */
     338            1 :     if (fscanf(file, "<<%d>>\n", &num_elements) != 1)
     339            0 :         ereport(ERROR,
     340              :                 (errcode_for_file_access(),
     341              :                  errmsg("could not read from file \"%s\": %m",
     342              :                         AUTOPREWARM_FILE)));
     343              : 
     344              :     /* Allocate a dynamic shared memory segment to store the record data. */
     345            1 :     seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0);
     346            1 :     blkinfo = (BlockInfoRecord *) dsm_segment_address(seg);
     347              : 
     348              :     /* Read records, one per line. */
     349          236 :     for (i = 0; i < num_elements; i++)
     350              :     {
     351              :         unsigned    forknum;
     352              : 
     353          235 :         if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database,
     354          235 :                    &blkinfo[i].tablespace, &blkinfo[i].filenumber,
     355          235 :                    &forknum, &blkinfo[i].blocknum) != 5)
     356            0 :             ereport(ERROR,
     357              :                     (errmsg("autoprewarm block dump file is corrupted at line %d",
     358              :                             i + 1)));
     359          235 :         blkinfo[i].forknum = forknum;
     360              :     }
     361              : 
     362            1 :     FreeFile(file);
     363              : 
     364              :     /* Sort the blocks to be loaded. */
     365            1 :     qsort(blkinfo, num_elements, sizeof(BlockInfoRecord),
     366              :           apw_compare_blockinfo);
     367              : 
     368              :     /* Populate shared memory state. */
     369            1 :     apw_state->block_info_handle = dsm_segment_handle(seg);
     370            1 :     apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0;
     371            1 :     apw_state->prewarmed_blocks = 0;
     372              : 
     373              :     /* Don't prewarm more than we can fit. */
     374            1 :     if (num_elements > NBuffers)
     375              :     {
     376            0 :         num_elements = NBuffers;
     377            0 :         ereport(LOG,
     378              :                 (errmsg("autoprewarm capping prewarmed blocks to %d (shared_buffers size)",
     379              :                         NBuffers)));
     380              :     }
     381              : 
     382              :     /* Get the info position of the first block of the next database. */
     383            2 :     while (apw_state->prewarm_start_idx < num_elements)
     384              :     {
     385            1 :         int         j = apw_state->prewarm_start_idx;
     386            1 :         Oid         current_db = blkinfo[j].database;
     387              : 
     388              :         /*
     389              :          * Advance the prewarm_stop_idx to the first BlockInfoRecord that does
     390              :          * not belong to this database.
     391              :          */
     392            1 :         j++;
     393          235 :         while (j < num_elements)
     394              :         {
     395          234 :             if (current_db != blkinfo[j].database)
     396              :             {
     397              :                 /*
     398              :                  * Combine BlockInfoRecords for global objects with those of
     399              :                  * the database.
     400              :                  */
     401            1 :                 if (current_db != InvalidOid)
     402            0 :                     break;
     403            1 :                 current_db = blkinfo[j].database;
     404              :             }
     405              : 
     406          234 :             j++;
     407              :         }
     408              : 
     409              :         /*
     410              :          * If we reach this point with current_db == InvalidOid, then only
     411              :          * BlockInfoRecords belonging to global objects exist.  We can't
     412              :          * prewarm without a database connection, so just bail out.
     413              :          */
     414            1 :         if (current_db == InvalidOid)
     415            0 :             break;
     416              : 
     417              :         /* Configure stop point and database for next per-database worker. */
     418            1 :         apw_state->prewarm_stop_idx = j;
     419            1 :         apw_state->database = current_db;
     420              :         Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx);
     421              : 
     422              :         /*
     423              :          * Likewise, don't launch if we've already been told to shut down.
     424              :          * (The launch would fail anyway, but we might as well skip it.)
     425              :          */
     426            1 :         if (ShutdownRequestPending)
     427            0 :             break;
     428              : 
     429              :         /*
     430              :          * Start a per-database worker to load blocks for this database; this
     431              :          * function will return once the per-database worker exits.
     432              :          */
     433            1 :         apw_start_database_worker();
     434              : 
     435              :         /* Prepare for next database. */
     436            1 :         apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx;
     437              :     }
     438              : 
     439              :     /* Clean up. */
     440            1 :     dsm_detach(seg);
     441            1 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     442            1 :     apw_state->block_info_handle = DSM_HANDLE_INVALID;
     443            1 :     apw_state->pid_using_dumpfile = InvalidPid;
     444            1 :     LWLockRelease(&apw_state->lock);
     445              : 
     446              :     /* Report our success, if we were able to finish. */
     447            1 :     if (!ShutdownRequestPending)
     448            1 :         ereport(LOG,
     449              :                 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
     450              :                         apw_state->prewarmed_blocks, num_elements)));
     451              : }
     452              : 
     453              : /*
     454              :  * Return the next block number of a specific relation and fork to read
     455              :  * according to the array of BlockInfoRecord.
     456              :  */
     457              : static BlockNumber
     458          310 : apw_read_stream_next_block(ReadStream *stream,
     459              :                            void *callback_private_data,
     460              :                            void *per_buffer_data)
     461              : {
     462          310 :     AutoPrewarmReadStreamData *p = callback_private_data;
     463              : 
     464          310 :     CHECK_FOR_INTERRUPTS();
     465              : 
     466          310 :     while (p->pos < apw_state->prewarm_stop_idx)
     467              :     {
     468          309 :         BlockInfoRecord blk = p->block_info[p->pos];
     469              : 
     470          309 :         if (blk.tablespace != p->tablespace)
     471          309 :             return InvalidBlockNumber;
     472              : 
     473          308 :         if (blk.filenumber != p->filenumber)
     474           55 :             return InvalidBlockNumber;
     475              : 
     476          253 :         if (blk.forknum != p->forknum)
     477           18 :             return InvalidBlockNumber;
     478              : 
     479          235 :         p->pos++;
     480              : 
     481              :         /*
     482              :          * Check whether blocknum is valid and within fork file size.
     483              :          * Fast-forward through any invalid blocks. We want p->pos to reflect
     484              :          * the location of the next relation or fork before ending the stream.
     485              :          */
     486          235 :         if (blk.blocknum >= p->nblocks)
     487            0 :             continue;
     488              : 
     489          235 :         return blk.blocknum;
     490              :     }
     491              : 
     492            1 :     return InvalidBlockNumber;
     493              : }
     494              : 
     495              : /*
     496              :  * Prewarm all blocks for one database (and possibly also global objects, if
     497              :  * those got grouped with this database).
     498              :  */
     499              : void
     500            1 : autoprewarm_database_main(Datum main_arg)
     501              : {
     502              :     BlockInfoRecord *block_info;
     503              :     int         i;
     504              :     BlockInfoRecord blk;
     505              :     dsm_segment *seg;
     506              : 
     507              :     /* Establish signal handlers; once that's done, unblock signals. */
     508            1 :     pqsignal(SIGTERM, die);
     509            1 :     BackgroundWorkerUnblockSignals();
     510              : 
     511              :     /* Connect to correct database and get block information. */
     512            1 :     apw_init_shmem();
     513            1 :     seg = dsm_attach(apw_state->block_info_handle);
     514            1 :     if (seg == NULL)
     515            0 :         ereport(ERROR,
     516              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     517              :                  errmsg("could not map dynamic shared memory segment")));
     518            1 :     BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
     519            1 :     block_info = (BlockInfoRecord *) dsm_segment_address(seg);
     520              : 
     521            1 :     i = apw_state->prewarm_start_idx;
     522            1 :     blk = block_info[i];
     523              : 
     524              :     /*
     525              :      * Loop until we run out of blocks to prewarm or until we run out of
     526              :      * buffers.
     527              :      */
     528           58 :     while (i < apw_state->prewarm_stop_idx)
     529              :     {
     530           57 :         Oid         tablespace = blk.tablespace;
     531           57 :         RelFileNumber filenumber = blk.filenumber;
     532              :         Oid         reloid;
     533              :         Relation    rel;
     534              : 
     535              :         /*
     536              :          * All blocks between prewarm_start_idx and prewarm_stop_idx should
     537              :          * belong either to global objects or the same database.
     538              :          */
     539              :         Assert(blk.database == apw_state->database || blk.database == 0);
     540              : 
     541           57 :         StartTransactionCommand();
     542              : 
     543           57 :         reloid = RelidByRelfilenumber(blk.tablespace, blk.filenumber);
     544          114 :         if (!OidIsValid(reloid) ||
     545           57 :             (rel = try_relation_open(reloid, AccessShareLock)) == NULL)
     546              :         {
     547              :             /* We failed to open the relation, so there is nothing to close. */
     548            0 :             CommitTransactionCommand();
     549              : 
     550              :             /*
     551              :              * Fast-forward to the next relation. We want to skip all of the
     552              :              * other records referencing this relation since we know we can't
     553              :              * open it. That way, we avoid repeatedly trying and failing to
     554              :              * open the same relation.
     555              :              */
     556            0 :             for (; i < apw_state->prewarm_stop_idx; i++)
     557              :             {
     558            0 :                 blk = block_info[i];
     559            0 :                 if (blk.tablespace != tablespace ||
     560            0 :                     blk.filenumber != filenumber)
     561              :                     break;
     562              :             }
     563              : 
     564              :             /* Time to try and open our newfound relation */
     565            0 :             continue;
     566              :         }
     567              : 
     568              :         /*
     569              :          * We have a relation; now let's loop until we find a valid fork of
     570              :          * the relation or we run out of buffers. Once we've read from all
     571              :          * valid forks or run out of options, we'll close the relation and
     572              :          * move on.
     573              :          */
     574           57 :         while (i < apw_state->prewarm_stop_idx &&
     575          132 :                blk.tablespace == tablespace &&
     576          130 :                blk.filenumber == filenumber)
     577              :         {
     578           75 :             ForkNumber  forknum = blk.forknum;
     579              :             BlockNumber nblocks;
     580              :             struct AutoPrewarmReadStreamData p;
     581              :             ReadStream *stream;
     582              :             Buffer      buf;
     583              : 
     584              :             /*
     585              :              * smgrexists is not safe for illegal forknum, hence check whether
     586              :              * the passed forknum is valid before using it in smgrexists.
     587              :              */
     588           75 :             if (blk.forknum <= InvalidForkNumber ||
     589           75 :                 blk.forknum > MAX_FORKNUM ||
     590           75 :                 !smgrexists(RelationGetSmgr(rel), blk.forknum))
     591              :             {
     592              :                 /*
     593              :                  * Fast-forward to the next fork. We want to skip all of the
     594              :                  * other records referencing this fork since we already know
     595              :                  * it's not valid.
     596              :                  */
     597            0 :                 for (; i < apw_state->prewarm_stop_idx; i++)
     598              :                 {
     599            0 :                     blk = block_info[i];
     600            0 :                     if (blk.tablespace != tablespace ||
     601            0 :                         blk.filenumber != filenumber ||
     602            0 :                         blk.forknum != forknum)
     603              :                         break;
     604              :                 }
     605              : 
     606              :                 /* Time to check if this newfound fork is valid */
     607            0 :                 continue;
     608              :             }
     609              : 
     610           75 :             nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
     611              : 
     612           75 :             p = (struct AutoPrewarmReadStreamData)
     613              :             {
     614              :                 .block_info = block_info,
     615              :                     .pos = i,
     616              :                     .tablespace = tablespace,
     617              :                     .filenumber = filenumber,
     618              :                     .forknum = forknum,
     619              :                     .nblocks = nblocks,
     620              :             };
     621              : 
     622           75 :             stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
     623              :                                                 READ_STREAM_DEFAULT |
     624              :                                                 READ_STREAM_USE_BATCHING,
     625              :                                                 NULL,
     626              :                                                 rel,
     627              :                                                 p.forknum,
     628              :                                                 apw_read_stream_next_block,
     629              :                                                 &p,
     630              :                                                 0);
     631              : 
     632              :             /*
     633              :              * Loop until we've prewarmed all the blocks from this fork. The
     634              :              * read stream callback will check that we still have free buffers
     635              :              * before requesting each block from the read stream API.
     636              :              */
     637          310 :             while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     638              :             {
     639          235 :                 apw_state->prewarmed_blocks++;
     640          235 :                 ReleaseBuffer(buf);
     641              :             }
     642              : 
     643           75 :             read_stream_end(stream);
     644              : 
     645              :             /* Advance i past all the blocks just prewarmed. */
     646           75 :             i = p.pos;
     647           75 :             blk = block_info[i];
     648              :         }
     649              : 
     650           57 :         relation_close(rel, AccessShareLock);
     651           57 :         CommitTransactionCommand();
     652              :     }
     653              : 
     654            1 :     dsm_detach(seg);
     655            1 : }
     656              : 
     657              : /*
     658              :  * Dump information on blocks in shared buffers.  We use a text format here
     659              :  * so that it's easy to understand and even change the file contents if
     660              :  * necessary.
     661              :  * Returns the number of blocks dumped.
     662              :  */
     663              : static int
     664            3 : apw_dump_now(bool is_bgworker, bool dump_unlogged)
     665              : {
     666              :     int         num_blocks;
     667              :     int         i;
     668              :     int         ret;
     669              :     BlockInfoRecord *block_info_array;
     670              :     BufferDesc *bufHdr;
     671              :     FILE       *file;
     672              :     char        transient_dump_file_path[MAXPGPATH];
     673              :     pid_t       pid;
     674              : 
     675            3 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     676            3 :     pid = apw_state->pid_using_dumpfile;
     677            3 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     678            3 :         apw_state->pid_using_dumpfile = MyProcPid;
     679            3 :     LWLockRelease(&apw_state->lock);
     680              : 
     681            3 :     if (pid != InvalidPid)
     682              :     {
     683            0 :         if (!is_bgworker)
     684            0 :             ereport(ERROR,
     685              :                     (errmsg("could not perform block dump because dump file is being used by PID %d",
     686              :                             (int) apw_state->pid_using_dumpfile)));
     687              : 
     688            0 :         ereport(LOG,
     689              :                 (errmsg("skipping block dump because it is already being performed by PID %d",
     690              :                         (int) apw_state->pid_using_dumpfile)));
     691            0 :         return 0;
     692              :     }
     693              : 
     694              :     /*
     695              :      * With sufficiently large shared_buffers, allocation will exceed 1GB, so
     696              :      * allow for a huge allocation to prevent outright failure.
     697              :      *
     698              :      * (In the future, it might be a good idea to redesign this to use a more
     699              :      * memory-efficient data structure.)
     700              :      */
     701              :     block_info_array = (BlockInfoRecord *)
     702            3 :         palloc_extended((sizeof(BlockInfoRecord) * NBuffers), MCXT_ALLOC_HUGE);
     703              : 
     704        49155 :     for (num_blocks = 0, i = 0; i < NBuffers; i++)
     705              :     {
     706              :         uint64      buf_state;
     707              : 
     708        49152 :         CHECK_FOR_INTERRUPTS();
     709              : 
     710        49152 :         bufHdr = GetBufferDescriptor(i);
     711              : 
     712              :         /* Lock each buffer header before inspecting. */
     713        49152 :         buf_state = LockBufHdr(bufHdr);
     714              : 
     715              :         /*
     716              :          * Unlogged tables will be automatically truncated after a crash or
     717              :          * unclean shutdown. In such cases we need not prewarm them. Dump them
     718              :          * only if requested by caller.
     719              :          */
     720        49152 :         if (buf_state & BM_TAG_VALID &&
     721          705 :             ((buf_state & BM_PERMANENT) || dump_unlogged))
     722              :         {
     723          705 :             block_info_array[num_blocks].database = bufHdr->tag.dbOid;
     724          705 :             block_info_array[num_blocks].tablespace = bufHdr->tag.spcOid;
     725         1410 :             block_info_array[num_blocks].filenumber =
     726          705 :                 BufTagGetRelNumber(&bufHdr->tag);
     727         1410 :             block_info_array[num_blocks].forknum =
     728          705 :                 BufTagGetForkNum(&bufHdr->tag);
     729          705 :             block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum;
     730          705 :             ++num_blocks;
     731              :         }
     732              : 
     733        49152 :         UnlockBufHdr(bufHdr);
     734              :     }
     735              : 
     736            3 :     snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE);
     737            3 :     file = AllocateFile(transient_dump_file_path, "w");
     738            3 :     if (!file)
     739            0 :         ereport(ERROR,
     740              :                 (errcode_for_file_access(),
     741              :                  errmsg("could not open file \"%s\": %m",
     742              :                         transient_dump_file_path)));
     743              : 
     744            3 :     ret = fprintf(file, "<<%d>>\n", num_blocks);
     745            3 :     if (ret < 0)
     746              :     {
     747            0 :         int         save_errno = errno;
     748              : 
     749            0 :         FreeFile(file);
     750            0 :         unlink(transient_dump_file_path);
     751            0 :         errno = save_errno;
     752            0 :         ereport(ERROR,
     753              :                 (errcode_for_file_access(),
     754              :                  errmsg("could not write to file \"%s\": %m",
     755              :                         transient_dump_file_path)));
     756              :     }
     757              : 
     758          708 :     for (i = 0; i < num_blocks; i++)
     759              :     {
     760          705 :         CHECK_FOR_INTERRUPTS();
     761              : 
     762          705 :         ret = fprintf(file, "%u,%u,%u,%u,%u\n",
     763          705 :                       block_info_array[i].database,
     764          705 :                       block_info_array[i].tablespace,
     765          705 :                       block_info_array[i].filenumber,
     766          705 :                       (uint32) block_info_array[i].forknum,
     767          705 :                       block_info_array[i].blocknum);
     768          705 :         if (ret < 0)
     769              :         {
     770            0 :             int         save_errno = errno;
     771              : 
     772            0 :             FreeFile(file);
     773            0 :             unlink(transient_dump_file_path);
     774            0 :             errno = save_errno;
     775            0 :             ereport(ERROR,
     776              :                     (errcode_for_file_access(),
     777              :                      errmsg("could not write to file \"%s\": %m",
     778              :                             transient_dump_file_path)));
     779              :         }
     780              :     }
     781              : 
     782            3 :     pfree(block_info_array);
     783              : 
     784              :     /*
     785              :      * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things
     786              :      * permanent.
     787              :      */
     788            3 :     ret = FreeFile(file);
     789            3 :     if (ret != 0)
     790              :     {
     791            0 :         int         save_errno = errno;
     792              : 
     793            0 :         unlink(transient_dump_file_path);
     794            0 :         errno = save_errno;
     795            0 :         ereport(ERROR,
     796              :                 (errcode_for_file_access(),
     797              :                  errmsg("could not close file \"%s\": %m",
     798              :                         transient_dump_file_path)));
     799              :     }
     800              : 
     801            3 :     (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR);
     802            3 :     apw_state->pid_using_dumpfile = InvalidPid;
     803              : 
     804            3 :     ereport(DEBUG1,
     805              :             (errmsg_internal("wrote block details for %d blocks", num_blocks)));
     806            3 :     return num_blocks;
     807              : }
     808              : 
     809              : /*
     810              :  * SQL-callable function to launch autoprewarm.
     811              :  */
     812              : Datum
     813            0 : autoprewarm_start_worker(PG_FUNCTION_ARGS)
     814              : {
     815              :     pid_t       pid;
     816              : 
     817            0 :     if (!autoprewarm)
     818            0 :         ereport(ERROR,
     819              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     820              :                  errmsg("autoprewarm is disabled")));
     821              : 
     822            0 :     apw_init_shmem();
     823            0 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     824            0 :     pid = apw_state->bgworker_pid;
     825            0 :     LWLockRelease(&apw_state->lock);
     826              : 
     827            0 :     if (pid != InvalidPid)
     828            0 :         ereport(ERROR,
     829              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     830              :                  errmsg("autoprewarm worker is already running under PID %d",
     831              :                         (int) pid)));
     832              : 
     833            0 :     apw_start_leader_worker();
     834              : 
     835            0 :     PG_RETURN_VOID();
     836              : }
     837              : 
     838              : /*
     839              :  * SQL-callable function to perform an immediate block dump.
     840              :  *
     841              :  * Note: this is declared to return int8, as insurance against some
     842              :  * very distant day when we might make NBuffers wider than int.
     843              :  */
     844              : Datum
     845            1 : autoprewarm_dump_now(PG_FUNCTION_ARGS)
     846              : {
     847              :     int         num_blocks;
     848              : 
     849            1 :     apw_init_shmem();
     850              : 
     851            1 :     PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     852              :     {
     853            1 :         num_blocks = apw_dump_now(false, true);
     854              :     }
     855            1 :     PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     856              : 
     857            1 :     PG_RETURN_INT64((int64) num_blocks);
     858              : }
     859              : 
     860              : static void
     861            2 : apw_init_state(void *ptr, void *arg)
     862              : {
     863            2 :     AutoPrewarmSharedState *state = (AutoPrewarmSharedState *) ptr;
     864              : 
     865            2 :     LWLockInitialize(&state->lock, LWLockNewTrancheId("autoprewarm"));
     866            2 :     state->bgworker_pid = InvalidPid;
     867            2 :     state->pid_using_dumpfile = InvalidPid;
     868            2 : }
     869              : 
     870              : /*
     871              :  * Allocate and initialize autoprewarm related shared memory, if not already
     872              :  * done, and set up backend-local pointer to that state.  Returns true if an
     873              :  * existing shared memory segment was found.
     874              :  */
     875              : static bool
     876            4 : apw_init_shmem(void)
     877              : {
     878              :     bool        found;
     879              : 
     880            4 :     apw_state = GetNamedDSMSegment("autoprewarm",
     881              :                                    sizeof(AutoPrewarmSharedState),
     882              :                                    apw_init_state,
     883              :                                    &found, NULL);
     884              : 
     885            4 :     return found;
     886              : }
     887              : 
     888              : /*
     889              :  * Clear our PID from autoprewarm shared state.
     890              :  */
     891              : static void
     892            2 : apw_detach_shmem(int code, Datum arg)
     893              : {
     894            2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     895            2 :     if (apw_state->pid_using_dumpfile == MyProcPid)
     896            0 :         apw_state->pid_using_dumpfile = InvalidPid;
     897            2 :     if (apw_state->bgworker_pid == MyProcPid)
     898            2 :         apw_state->bgworker_pid = InvalidPid;
     899            2 :     LWLockRelease(&apw_state->lock);
     900            2 : }
     901              : 
     902              : /*
     903              :  * Start autoprewarm leader worker process.
     904              :  */
     905              : static void
     906            2 : apw_start_leader_worker(void)
     907              : {
     908            2 :     BackgroundWorker worker = {0};
     909              :     BackgroundWorkerHandle *handle;
     910              :     BgwHandleStatus status;
     911              :     pid_t       pid;
     912              : 
     913            2 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     914            2 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     915            2 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     916            2 :     strcpy(worker.bgw_function_name, "autoprewarm_main");
     917            2 :     strcpy(worker.bgw_name, "autoprewarm leader");
     918            2 :     strcpy(worker.bgw_type, "autoprewarm leader");
     919              : 
     920            2 :     if (process_shared_preload_libraries_in_progress)
     921              :     {
     922            2 :         RegisterBackgroundWorker(&worker);
     923            2 :         return;
     924              :     }
     925              : 
     926              :     /* must set notify PID to wait for startup */
     927            0 :     worker.bgw_notify_pid = MyProcPid;
     928              : 
     929            0 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     930            0 :         ereport(ERROR,
     931              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     932              :                  errmsg("could not register background process"),
     933              :                  errhint("You may need to increase \"max_worker_processes\".")));
     934              : 
     935            0 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     936            0 :     if (status != BGWH_STARTED)
     937            0 :         ereport(ERROR,
     938              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     939              :                  errmsg("could not start background process"),
     940              :                  errhint("More details may be available in the server log.")));
     941              : }
     942              : 
     943              : /*
     944              :  * Start autoprewarm per-database worker process.
     945              :  */
     946              : static void
     947            1 : apw_start_database_worker(void)
     948              : {
     949            1 :     BackgroundWorker worker = {0};
     950              :     BackgroundWorkerHandle *handle;
     951              : 
     952            1 :     worker.bgw_flags =
     953              :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
     954            1 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     955            1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     956            1 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     957            1 :     strcpy(worker.bgw_function_name, "autoprewarm_database_main");
     958            1 :     strcpy(worker.bgw_name, "autoprewarm worker");
     959            1 :     strcpy(worker.bgw_type, "autoprewarm worker");
     960              : 
     961              :     /* must set notify PID to wait for shutdown */
     962            1 :     worker.bgw_notify_pid = MyProcPid;
     963              : 
     964            1 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     965            0 :         ereport(ERROR,
     966              :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     967              :                  errmsg("registering dynamic bgworker autoprewarm failed"),
     968              :                  errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
     969              : 
     970              :     /*
     971              :      * Ignore return value; if it fails, postmaster has died, but we have
     972              :      * checks for that elsewhere.
     973              :      */
     974            1 :     WaitForBackgroundWorkerShutdown(handle);
     975            1 : }
     976              : 
     977              : /* Compare member elements to check whether they are not equal. */
     978              : #define cmp_member_elem(fld)    \
     979              : do { \
     980              :     if (a->fld < b->fld)       \
     981              :         return -1;              \
     982              :     else if (a->fld > b->fld)  \
     983              :         return 1;               \
     984              : } while(0)
     985              : 
     986              : /*
     987              :  * apw_compare_blockinfo
     988              :  *
     989              :  * We depend on all records for a particular database being consecutive
     990              :  * in the dump file; each per-database worker will preload blocks until
     991              :  * it sees a block for some other database.  Sorting by tablespace,
     992              :  * filenumber, forknum, and blocknum isn't critical for correctness, but
     993              :  * helps us get a sequential I/O pattern.
     994              :  */
     995              : static int
     996         1963 : apw_compare_blockinfo(const void *p, const void *q)
     997              : {
     998         1963 :     const BlockInfoRecord *a = (const BlockInfoRecord *) p;
     999         1963 :     const BlockInfoRecord *b = (const BlockInfoRecord *) q;
    1000              : 
    1001         1963 :     cmp_member_elem(database);
    1002         1898 :     cmp_member_elem(tablespace);
    1003         1898 :     cmp_member_elem(filenumber);
    1004          694 :     cmp_member_elem(forknum);
    1005          566 :     cmp_member_elem(blocknum);
    1006              : 
    1007            0 :     return 0;
    1008              : }
        

Generated by: LCOV version 2.0-1