LCOV - code coverage report
Current view: top level - contrib/pg_prewarm - autoprewarm.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 235 312 75.3 %
Date: 2025-09-10 22:18:18 Functions: 15 16 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * autoprewarm.c
       4             :  *      Periodically dump information about the blocks present in
       5             :  *      shared_buffers, and reload them on server restart.
       6             :  *
       7             :  *      Due to locking considerations, we can't actually begin prewarming
       8             :  *      until the server reaches a consistent state.  We need the catalogs
       9             :  *      to be consistent so that we can figure out which relation to lock,
      10             :  *      and we need to lock the relations so that we don't try to prewarm
      11             :  *      pages from a relation that is in the process of being dropped.
      12             :  *
      13             :  *      While prewarming, autoprewarm will use two workers.  There's a
      14             :  *      leader worker that reads and sorts the list of blocks to be
      15             :  *      prewarmed and then launches a per-database worker for each
      16             :  *      relevant database in turn.  The former keeps running after the
      17             :  *      initial prewarm is complete to update the dump file periodically.
      18             :  *
      19             :  *  Copyright (c) 2016-2025, PostgreSQL Global Development Group
      20             :  *
      21             :  *  IDENTIFICATION
      22             :  *      contrib/pg_prewarm/autoprewarm.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : 
      27             : #include "postgres.h"
      28             : 
      29             : #include <unistd.h>
      30             : 
      31             : #include "access/relation.h"
      32             : #include "access/xact.h"
      33             : #include "pgstat.h"
      34             : #include "postmaster/bgworker.h"
      35             : #include "postmaster/interrupt.h"
      36             : #include "storage/buf_internals.h"
      37             : #include "storage/dsm.h"
      38             : #include "storage/dsm_registry.h"
      39             : #include "storage/fd.h"
      40             : #include "storage/ipc.h"
      41             : #include "storage/latch.h"
      42             : #include "storage/lwlock.h"
      43             : #include "storage/procsignal.h"
      44             : #include "storage/read_stream.h"
      45             : #include "storage/smgr.h"
      46             : #include "tcop/tcopprot.h"
      47             : #include "utils/guc.h"
      48             : #include "utils/rel.h"
      49             : #include "utils/relfilenumbermap.h"
      50             : #include "utils/timestamp.h"
      51             : 
      52             : #define AUTOPREWARM_FILE "autoprewarm.blocks"
      53             : 
      54             : /* Metadata for each block we dump. */
      55             : typedef struct BlockInfoRecord
      56             : {
      57             :     Oid         database;
      58             :     Oid         tablespace;
      59             :     RelFileNumber filenumber;
      60             :     ForkNumber  forknum;
      61             :     BlockNumber blocknum;
      62             : } BlockInfoRecord;
      63             : 
      64             : /* Shared state information for autoprewarm bgworker. */
      65             : typedef struct AutoPrewarmSharedState
      66             : {
      67             :     LWLock      lock;           /* mutual exclusion */
      68             :     pid_t       bgworker_pid;   /* for main bgworker */
      69             :     pid_t       pid_using_dumpfile; /* for autoprewarm or block dump */
      70             : 
      71             :     /* Following items are for communication with per-database worker */
      72             :     dsm_handle  block_info_handle;
      73             :     Oid         database;
      74             :     int         prewarm_start_idx;
      75             :     int         prewarm_stop_idx;
      76             :     int         prewarmed_blocks;
      77             : } AutoPrewarmSharedState;
      78             : 
      79             : /*
      80             :  * Private data passed through the read stream API for our use in the
      81             :  * callback.
      82             :  */
      83             : typedef struct AutoPrewarmReadStreamData
      84             : {
      85             :     /* The array of records containing the blocks we should prewarm. */
      86             :     BlockInfoRecord *block_info;
      87             : 
      88             :     /*
      89             :      * pos is the read stream callback's index into block_info. Because the
      90             :      * read stream may read ahead, pos is likely to be ahead of the index in
      91             :      * the main loop in autoprewarm_database_main().
      92             :      */
      93             :     int         pos;
      94             :     Oid         tablespace;
      95             :     RelFileNumber filenumber;
      96             :     ForkNumber  forknum;
      97             :     BlockNumber nblocks;
      98             : } AutoPrewarmReadStreamData;
      99             : 
     100             : 
     101             : PGDLLEXPORT void autoprewarm_main(Datum main_arg);
     102             : PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
     103             : 
     104           6 : PG_FUNCTION_INFO_V1(autoprewarm_start_worker);
     105           8 : PG_FUNCTION_INFO_V1(autoprewarm_dump_now);
     106             : 
     107             : static void apw_load_buffers(void);
     108             : static int  apw_dump_now(bool is_bgworker, bool dump_unlogged);
     109             : static void apw_start_leader_worker(void);
     110             : static void apw_start_database_worker(void);
     111             : static bool apw_init_shmem(void);
     112             : static void apw_detach_shmem(int code, Datum arg);
     113             : static int  apw_compare_blockinfo(const void *p, const void *q);
     114             : 
     115             : /* Pointer to shared-memory state. */
     116             : static AutoPrewarmSharedState *apw_state = NULL;
     117             : 
     118             : /* GUC variables. */
     119             : static bool autoprewarm = true; /* start worker? */
     120             : static int  autoprewarm_interval = 300; /* dump interval */
     121             : 
     122             : /*
     123             :  * Module load callback.
     124             :  */
     125             : void
     126          12 : _PG_init(void)
     127             : {
     128          12 :     DefineCustomIntVariable("pg_prewarm.autoprewarm_interval",
     129             :                             "Sets the interval between dumps of shared buffers",
     130             :                             "If set to zero, time-based dumping is disabled.",
     131             :                             &autoprewarm_interval,
     132             :                             300,
     133             :                             0, INT_MAX / 1000,
     134             :                             PGC_SIGHUP,
     135             :                             GUC_UNIT_S,
     136             :                             NULL,
     137             :                             NULL,
     138             :                             NULL);
     139             : 
     140          12 :     if (!process_shared_preload_libraries_in_progress)
     141           8 :         return;
     142             : 
     143             :     /* can't define PGC_POSTMASTER variable after startup */
     144           4 :     DefineCustomBoolVariable("pg_prewarm.autoprewarm",
     145             :                              "Starts the autoprewarm worker.",
     146             :                              NULL,
     147             :                              &autoprewarm,
     148             :                              true,
     149             :                              PGC_POSTMASTER,
     150             :                              0,
     151             :                              NULL,
     152             :                              NULL,
     153             :                              NULL);
     154             : 
     155           4 :     MarkGUCPrefixReserved("pg_prewarm");
     156             : 
     157             :     /* Register autoprewarm worker, if enabled. */
     158           4 :     if (autoprewarm)
     159           4 :         apw_start_leader_worker();
     160             : }
     161             : 
     162             : /*
     163             :  * Main entry point for the leader autoprewarm process.  Per-database workers
     164             :  * have a separate entry point.
     165             :  */
     166             : void
     167           4 : autoprewarm_main(Datum main_arg)
     168             : {
     169           4 :     bool        first_time = true;
     170           4 :     bool        final_dump_allowed = true;
     171           4 :     TimestampTz last_dump_time = 0;
     172             : 
     173             :     /* Establish signal handlers; once that's done, unblock signals. */
     174           4 :     pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
     175           4 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     176           4 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     177           4 :     BackgroundWorkerUnblockSignals();
     178             : 
     179             :     /* Create (if necessary) and attach to our shared memory area. */
     180           4 :     if (apw_init_shmem())
     181           0 :         first_time = false;
     182             : 
     183             :     /*
     184             :      * Set on-detach hook so that our PID will be cleared on exit.
     185             :      *
     186             :      * NB: Autoprewarm's state is stored in a DSM segment, and DSM segments
     187             :      * are detached before calling the on_shmem_exit callbacks, so we must put
     188             :      * apw_detach_shmem in the before_shmem_exit callback list.
     189             :      */
     190           4 :     before_shmem_exit(apw_detach_shmem, 0);
     191             : 
     192             :     /*
     193             :      * Store our PID in the shared memory area --- unless there's already
     194             :      * another worker running, in which case just exit.
     195             :      */
     196           4 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     197           4 :     if (apw_state->bgworker_pid != InvalidPid)
     198             :     {
     199           0 :         LWLockRelease(&apw_state->lock);
     200           0 :         ereport(LOG,
     201             :                 (errmsg("autoprewarm worker is already running under PID %d",
     202             :                         (int) apw_state->bgworker_pid)));
     203           0 :         return;
     204             :     }
     205           4 :     apw_state->bgworker_pid = MyProcPid;
     206           4 :     LWLockRelease(&apw_state->lock);
     207             : 
     208             :     /*
     209             :      * Preload buffers from the dump file only if we just created the shared
     210             :      * memory region.  Otherwise, it's either already been done or shouldn't
     211             :      * be done - e.g. because the old dump file has been overwritten since the
     212             :      * server was started.
     213             :      *
     214             :      * There's not much point in performing a dump immediately after we finish
     215             :      * preloading; so, if we do end up preloading, consider the last dump time
     216             :      * to be equal to the current time.
     217             :      *
     218             :      * If apw_load_buffers() is terminated early by a shutdown request,
     219             :      * prevent dumping out our state below the loop, because we'd effectively
     220             :      * just truncate the saved state to however much we'd managed to preload.
     221             :      */
     222           4 :     if (first_time)
     223             :     {
     224           4 :         apw_load_buffers();
     225           4 :         final_dump_allowed = !ShutdownRequestPending;
     226           4 :         last_dump_time = GetCurrentTimestamp();
     227             :     }
     228             : 
     229             :     /* Periodically dump buffers until terminated. */
     230          10 :     while (!ShutdownRequestPending)
     231             :     {
     232             :         /* In case of a SIGHUP, just reload the configuration. */
     233           6 :         if (ConfigReloadPending)
     234             :         {
     235           0 :             ConfigReloadPending = false;
     236           0 :             ProcessConfigFile(PGC_SIGHUP);
     237             :         }
     238             : 
     239           6 :         if (autoprewarm_interval <= 0)
     240             :         {
     241             :             /* We're only dumping at shutdown, so just wait forever. */
     242           6 :             (void) WaitLatch(MyLatch,
     243             :                              WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     244             :                              -1L,
     245             :                              PG_WAIT_EXTENSION);
     246             :         }
     247             :         else
     248             :         {
     249             :             TimestampTz next_dump_time;
     250             :             long        delay_in_ms;
     251             : 
     252             :             /* Compute the next dump time. */
     253           0 :             next_dump_time =
     254           0 :                 TimestampTzPlusMilliseconds(last_dump_time,
     255             :                                             autoprewarm_interval * 1000);
     256             :             delay_in_ms =
     257           0 :                 TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
     258             :                                                 next_dump_time);
     259             : 
     260             :             /* Perform a dump if it's time. */
     261           0 :             if (delay_in_ms <= 0)
     262             :             {
     263           0 :                 last_dump_time = GetCurrentTimestamp();
     264           0 :                 apw_dump_now(true, false);
     265           0 :                 continue;
     266             :             }
     267             : 
     268             :             /* Sleep until the next dump time. */
     269           0 :             (void) WaitLatch(MyLatch,
     270             :                              WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     271             :                              delay_in_ms,
     272             :                              PG_WAIT_EXTENSION);
     273             :         }
     274             : 
     275             :         /* Reset the latch, loop. */
     276           6 :         ResetLatch(MyLatch);
     277             :     }
     278             : 
     279             :     /*
     280             :      * Dump one last time.  We assume this is probably the result of a system
     281             :      * shutdown, although it's possible that we've merely been terminated.
     282             :      */
     283           4 :     if (final_dump_allowed)
     284           4 :         apw_dump_now(true, true);
     285             : }
     286             : 
     287             : /*
     288             :  * Read the dump file and launch per-database workers one at a time to
     289             :  * prewarm the buffers found there.
     290             :  */
     291             : static void
     292           4 : apw_load_buffers(void)
     293             : {
     294           4 :     FILE       *file = NULL;
     295             :     int         num_elements,
     296             :                 i;
     297             :     BlockInfoRecord *blkinfo;
     298             :     dsm_segment *seg;
     299             : 
     300             :     /*
     301             :      * Skip the prewarm if the dump file is in use; otherwise, prevent any
     302             :      * other process from writing it while we're using it.
     303             :      */
     304           4 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     305           4 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     306           4 :         apw_state->pid_using_dumpfile = MyProcPid;
     307             :     else
     308             :     {
     309           0 :         LWLockRelease(&apw_state->lock);
     310           0 :         ereport(LOG,
     311             :                 (errmsg("skipping prewarm because block dump file is being written by PID %d",
     312             :                         (int) apw_state->pid_using_dumpfile)));
     313           2 :         return;
     314             :     }
     315           4 :     LWLockRelease(&apw_state->lock);
     316             : 
     317             :     /*
     318             :      * Open the block dump file.  Exit quietly if it doesn't exist, but report
     319             :      * any other error.
     320             :      */
     321           4 :     file = AllocateFile(AUTOPREWARM_FILE, "r");
     322           4 :     if (!file)
     323             :     {
     324           2 :         if (errno == ENOENT)
     325             :         {
     326           2 :             LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     327           2 :             apw_state->pid_using_dumpfile = InvalidPid;
     328           2 :             LWLockRelease(&apw_state->lock);
     329           2 :             return;             /* No file to load. */
     330             :         }
     331           0 :         ereport(ERROR,
     332             :                 (errcode_for_file_access(),
     333             :                  errmsg("could not read file \"%s\": %m",
     334             :                         AUTOPREWARM_FILE)));
     335             :     }
     336             : 
     337             :     /* First line of the file is a record count. */
     338           2 :     if (fscanf(file, "<<%d>>\n", &num_elements) != 1)
     339           0 :         ereport(ERROR,
     340             :                 (errcode_for_file_access(),
     341             :                  errmsg("could not read from file \"%s\": %m",
     342             :                         AUTOPREWARM_FILE)));
     343             : 
     344             :     /* Allocate a dynamic shared memory segment to store the record data. */
     345           2 :     seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0);
     346           2 :     blkinfo = (BlockInfoRecord *) dsm_segment_address(seg);
     347             : 
     348             :     /* Read records, one per line. */
     349         430 :     for (i = 0; i < num_elements; i++)
     350             :     {
     351             :         unsigned    forknum;
     352             : 
     353         428 :         if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database,
     354         428 :                    &blkinfo[i].tablespace, &blkinfo[i].filenumber,
     355         428 :                    &forknum, &blkinfo[i].blocknum) != 5)
     356           0 :             ereport(ERROR,
     357             :                     (errmsg("autoprewarm block dump file is corrupted at line %d",
     358             :                             i + 1)));
     359         428 :         blkinfo[i].forknum = forknum;
     360             :     }
     361             : 
     362           2 :     FreeFile(file);
     363             : 
     364             :     /* Sort the blocks to be loaded. */
     365           2 :     qsort(blkinfo, num_elements, sizeof(BlockInfoRecord),
     366             :           apw_compare_blockinfo);
     367             : 
     368             :     /* Populate shared memory state. */
     369           2 :     apw_state->block_info_handle = dsm_segment_handle(seg);
     370           2 :     apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0;
     371           2 :     apw_state->prewarmed_blocks = 0;
     372             : 
     373             :     /* Don't prewarm more than we can fit. */
     374           2 :     if (num_elements > NBuffers)
     375             :     {
     376           0 :         num_elements = NBuffers;
     377           0 :         ereport(LOG,
     378             :                 (errmsg("autoprewarm capping prewarmed blocks to %d (shared_buffers size)",
     379             :                         NBuffers)));
     380             :     }
     381             : 
     382             :     /* Get the info position of the first block of the next database. */
     383           4 :     while (apw_state->prewarm_start_idx < num_elements)
     384             :     {
     385           2 :         int         j = apw_state->prewarm_start_idx;
     386           2 :         Oid         current_db = blkinfo[j].database;
     387             : 
     388             :         /*
     389             :          * Advance the prewarm_stop_idx to the first BlockInfoRecord that does
     390             :          * not belong to this database.
     391             :          */
     392           2 :         j++;
     393         428 :         while (j < num_elements)
     394             :         {
     395         426 :             if (current_db != blkinfo[j].database)
     396             :             {
     397             :                 /*
     398             :                  * Combine BlockInfoRecords for global objects with those of
     399             :                  * the database.
     400             :                  */
     401           2 :                 if (current_db != InvalidOid)
     402           0 :                     break;
     403           2 :                 current_db = blkinfo[j].database;
     404             :             }
     405             : 
     406         426 :             j++;
     407             :         }
     408             : 
     409             :         /*
     410             :          * If we reach this point with current_db == InvalidOid, then only
     411             :          * BlockInfoRecords belonging to global objects exist.  We can't
     412             :          * prewarm without a database connection, so just bail out.
     413             :          */
     414           2 :         if (current_db == InvalidOid)
     415           0 :             break;
     416             : 
     417             :         /* Configure stop point and database for next per-database worker. */
     418           2 :         apw_state->prewarm_stop_idx = j;
     419           2 :         apw_state->database = current_db;
     420             :         Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx);
     421             : 
     422             :         /*
     423             :          * Likewise, don't launch if we've already been told to shut down.
     424             :          * (The launch would fail anyway, but we might as well skip it.)
     425             :          */
     426           2 :         if (ShutdownRequestPending)
     427           0 :             break;
     428             : 
     429             :         /*
     430             :          * Start a per-database worker to load blocks for this database; this
     431             :          * function will return once the per-database worker exits.
     432             :          */
     433           2 :         apw_start_database_worker();
     434             : 
     435             :         /* Prepare for next database. */
     436           2 :         apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx;
     437             :     }
     438             : 
     439             :     /* Clean up. */
     440           2 :     dsm_detach(seg);
     441           2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     442           2 :     apw_state->block_info_handle = DSM_HANDLE_INVALID;
     443           2 :     apw_state->pid_using_dumpfile = InvalidPid;
     444           2 :     LWLockRelease(&apw_state->lock);
     445             : 
     446             :     /* Report our success, if we were able to finish. */
     447           2 :     if (!ShutdownRequestPending)
     448           2 :         ereport(LOG,
     449             :                 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
     450             :                         apw_state->prewarmed_blocks, num_elements)));
     451             : }
     452             : 
     453             : /*
     454             :  * Return the next block number of a specific relation and fork to read
     455             :  * according to the array of BlockInfoRecord.
     456             :  */
     457             : static BlockNumber
     458         554 : apw_read_stream_next_block(ReadStream *stream,
     459             :                            void *callback_private_data,
     460             :                            void *per_buffer_data)
     461             : {
     462         554 :     AutoPrewarmReadStreamData *p = callback_private_data;
     463             : 
     464         554 :     CHECK_FOR_INTERRUPTS();
     465             : 
     466         554 :     while (p->pos < apw_state->prewarm_stop_idx)
     467             :     {
     468         552 :         BlockInfoRecord blk = p->block_info[p->pos];
     469             : 
     470         552 :         if (blk.tablespace != p->tablespace)
     471         552 :             return InvalidBlockNumber;
     472             : 
     473         550 :         if (blk.filenumber != p->filenumber)
     474          94 :             return InvalidBlockNumber;
     475             : 
     476         456 :         if (blk.forknum != p->forknum)
     477          28 :             return InvalidBlockNumber;
     478             : 
     479         428 :         p->pos++;
     480             : 
     481             :         /*
     482             :          * Check whether blocknum is valid and within fork file size.
     483             :          * Fast-forward through any invalid blocks. We want p->pos to reflect
     484             :          * the location of the next relation or fork before ending the stream.
     485             :          */
     486         428 :         if (blk.blocknum >= p->nblocks)
     487           0 :             continue;
     488             : 
     489         428 :         return blk.blocknum;
     490             :     }
     491             : 
     492           2 :     return InvalidBlockNumber;
     493             : }
     494             : 
     495             : /*
     496             :  * Prewarm all blocks for one database (and possibly also global objects, if
     497             :  * those got grouped with this database).
     498             :  */
     499             : void
     500           2 : autoprewarm_database_main(Datum main_arg)
     501             : {
     502             :     BlockInfoRecord *block_info;
     503             :     int         i;
     504             :     BlockInfoRecord blk;
     505             :     dsm_segment *seg;
     506             : 
     507             :     /* Establish signal handlers; once that's done, unblock signals. */
     508           2 :     pqsignal(SIGTERM, die);
     509           2 :     BackgroundWorkerUnblockSignals();
     510             : 
     511             :     /* Connect to correct database and get block information. */
     512           2 :     apw_init_shmem();
     513           2 :     seg = dsm_attach(apw_state->block_info_handle);
     514           2 :     if (seg == NULL)
     515           0 :         ereport(ERROR,
     516             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     517             :                  errmsg("could not map dynamic shared memory segment")));
     518           2 :     BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
     519           2 :     block_info = (BlockInfoRecord *) dsm_segment_address(seg);
     520             : 
     521           2 :     i = apw_state->prewarm_start_idx;
     522           2 :     blk = block_info[i];
     523             : 
     524             :     /*
     525             :      * Loop until we run out of blocks to prewarm or until we run out of
     526             :      * buffers.
     527             :      */
     528         100 :     while (i < apw_state->prewarm_stop_idx)
     529             :     {
     530          98 :         Oid         tablespace = blk.tablespace;
     531          98 :         RelFileNumber filenumber = blk.filenumber;
     532             :         Oid         reloid;
     533             :         Relation    rel;
     534             : 
     535             :         /*
     536             :          * All blocks between prewarm_start_idx and prewarm_stop_idx should
     537             :          * belong either to global objects or the same database.
     538             :          */
     539             :         Assert(blk.database == apw_state->database || blk.database == 0);
     540             : 
     541          98 :         StartTransactionCommand();
     542             : 
     543          98 :         reloid = RelidByRelfilenumber(blk.tablespace, blk.filenumber);
     544         196 :         if (!OidIsValid(reloid) ||
     545          98 :             (rel = try_relation_open(reloid, AccessShareLock)) == NULL)
     546             :         {
     547             :             /* We failed to open the relation, so there is nothing to close. */
     548           0 :             CommitTransactionCommand();
     549             : 
     550             :             /*
     551             :              * Fast-forward to the next relation. We want to skip all of the
     552             :              * other records referencing this relation since we know we can't
     553             :              * open it. That way, we avoid repeatedly trying and failing to
     554             :              * open the same relation.
     555             :              */
     556           0 :             for (; i < apw_state->prewarm_stop_idx; i++)
     557             :             {
     558           0 :                 blk = block_info[i];
     559           0 :                 if (blk.tablespace != tablespace ||
     560           0 :                     blk.filenumber != filenumber)
     561             :                     break;
     562             :             }
     563             : 
     564             :             /* Time to try and open our newfound relation */
     565           0 :             continue;
     566             :         }
     567             : 
     568             :         /*
     569             :          * We have a relation; now let's loop until we find a valid fork of
     570             :          * the relation or we run out of buffers. Once we've read from all
     571             :          * valid forks or run out of options, we'll close the relation and
     572             :          * move on.
     573             :          */
     574          98 :         while (i < apw_state->prewarm_stop_idx &&
     575         224 :                blk.tablespace == tablespace &&
     576         220 :                blk.filenumber == filenumber)
     577             :         {
     578         126 :             ForkNumber  forknum = blk.forknum;
     579             :             BlockNumber nblocks;
     580             :             struct AutoPrewarmReadStreamData p;
     581             :             ReadStream *stream;
     582             :             Buffer      buf;
     583             : 
     584             :             /*
     585             :              * smgrexists is not safe for illegal forknum, hence check whether
     586             :              * the passed forknum is valid before using it in smgrexists.
     587             :              */
     588         126 :             if (blk.forknum <= InvalidForkNumber ||
     589         126 :                 blk.forknum > MAX_FORKNUM ||
     590         126 :                 !smgrexists(RelationGetSmgr(rel), blk.forknum))
     591             :             {
     592             :                 /*
     593             :                  * Fast-forward to the next fork. We want to skip all of the
     594             :                  * other records referencing this fork since we already know
     595             :                  * it's not valid.
     596             :                  */
     597           0 :                 for (; i < apw_state->prewarm_stop_idx; i++)
     598             :                 {
     599           0 :                     blk = block_info[i];
     600           0 :                     if (blk.tablespace != tablespace ||
     601           0 :                         blk.filenumber != filenumber ||
     602           0 :                         blk.forknum != forknum)
     603             :                         break;
     604             :                 }
     605             : 
     606             :                 /* Time to check if this newfound fork is valid */
     607           0 :                 continue;
     608             :             }
     609             : 
     610         126 :             nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
     611             : 
     612         126 :             p = (struct AutoPrewarmReadStreamData)
     613             :             {
     614             :                 .block_info = block_info,
     615             :                     .pos = i,
     616             :                     .tablespace = tablespace,
     617             :                     .filenumber = filenumber,
     618             :                     .forknum = forknum,
     619             :                     .nblocks = nblocks,
     620             :             };
     621             : 
     622         126 :             stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
     623             :                                                 READ_STREAM_DEFAULT |
     624             :                                                 READ_STREAM_USE_BATCHING,
     625             :                                                 NULL,
     626             :                                                 rel,
     627             :                                                 p.forknum,
     628             :                                                 apw_read_stream_next_block,
     629             :                                                 &p,
     630             :                                                 0);
     631             : 
     632             :             /*
     633             :              * Loop until we've prewarmed all the blocks from this fork. The
     634             :              * read stream callback will check that we still have free buffers
     635             :              * before requesting each block from the read stream API.
     636             :              */
     637         554 :             while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     638             :             {
     639         428 :                 apw_state->prewarmed_blocks++;
     640         428 :                 ReleaseBuffer(buf);
     641             :             }
     642             : 
     643         126 :             read_stream_end(stream);
     644             : 
     645             :             /* Advance i past all the blocks just prewarmed. */
     646         126 :             i = p.pos;
     647         126 :             blk = block_info[i];
     648             :         }
     649             : 
     650          98 :         relation_close(rel, AccessShareLock);
     651          98 :         CommitTransactionCommand();
     652             :     }
     653             : 
     654           2 :     dsm_detach(seg);
     655           2 : }
     656             : 
     657             : /*
     658             :  * Dump information on blocks in shared buffers.  We use a text format here
     659             :  * so that it's easy to understand and even change the file contents if
     660             :  * necessary.
     661             :  * Returns the number of blocks dumped.
     662             :  */
     663             : static int
     664           6 : apw_dump_now(bool is_bgworker, bool dump_unlogged)
     665             : {
     666             :     int         num_blocks;
     667             :     int         i;
     668             :     int         ret;
     669             :     BlockInfoRecord *block_info_array;
     670             :     BufferDesc *bufHdr;
     671             :     FILE       *file;
     672             :     char        transient_dump_file_path[MAXPGPATH];
     673             :     pid_t       pid;
     674             : 
     675           6 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     676           6 :     pid = apw_state->pid_using_dumpfile;
     677           6 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     678           6 :         apw_state->pid_using_dumpfile = MyProcPid;
     679           6 :     LWLockRelease(&apw_state->lock);
     680             : 
     681           6 :     if (pid != InvalidPid)
     682             :     {
     683           0 :         if (!is_bgworker)
     684           0 :             ereport(ERROR,
     685             :                     (errmsg("could not perform block dump because dump file is being used by PID %d",
     686             :                             (int) apw_state->pid_using_dumpfile)));
     687             : 
     688           0 :         ereport(LOG,
     689             :                 (errmsg("skipping block dump because it is already being performed by PID %d",
     690             :                         (int) apw_state->pid_using_dumpfile)));
     691           0 :         return 0;
     692             :     }
     693             : 
     694             :     /*
     695             :      * With sufficiently large shared_buffers, allocation will exceed 1GB, so
     696             :      * allow for a huge allocation to prevent outright failure.
     697             :      *
     698             :      * (In the future, it might be a good idea to redesign this to use a more
     699             :      * memory-efficient data structure.)
     700             :      */
     701             :     block_info_array = (BlockInfoRecord *)
     702           6 :         palloc_extended((sizeof(BlockInfoRecord) * NBuffers), MCXT_ALLOC_HUGE);
     703             : 
     704       98310 :     for (num_blocks = 0, i = 0; i < NBuffers; i++)
     705             :     {
     706             :         uint32      buf_state;
     707             : 
     708       98304 :         CHECK_FOR_INTERRUPTS();
     709             : 
     710       98304 :         bufHdr = GetBufferDescriptor(i);
     711             : 
     712             :         /* Lock each buffer header before inspecting. */
     713       98304 :         buf_state = LockBufHdr(bufHdr);
     714             : 
     715             :         /*
     716             :          * Unlogged tables will be automatically truncated after a crash or
     717             :          * unclean shutdown. In such cases we need not prewarm them. Dump them
     718             :          * only if requested by caller.
     719             :          */
     720       98304 :         if (buf_state & BM_TAG_VALID &&
     721        1284 :             ((buf_state & BM_PERMANENT) || dump_unlogged))
     722             :         {
     723        1284 :             block_info_array[num_blocks].database = bufHdr->tag.dbOid;
     724        1284 :             block_info_array[num_blocks].tablespace = bufHdr->tag.spcOid;
     725        2568 :             block_info_array[num_blocks].filenumber =
     726        1284 :                 BufTagGetRelNumber(&bufHdr->tag);
     727        2568 :             block_info_array[num_blocks].forknum =
     728        1284 :                 BufTagGetForkNum(&bufHdr->tag);
     729        1284 :             block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum;
     730        1284 :             ++num_blocks;
     731             :         }
     732             : 
     733       98304 :         UnlockBufHdr(bufHdr, buf_state);
     734             :     }
     735             : 
     736           6 :     snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE);
     737           6 :     file = AllocateFile(transient_dump_file_path, "w");
     738           6 :     if (!file)
     739           0 :         ereport(ERROR,
     740             :                 (errcode_for_file_access(),
     741             :                  errmsg("could not open file \"%s\": %m",
     742             :                         transient_dump_file_path)));
     743             : 
     744           6 :     ret = fprintf(file, "<<%d>>\n", num_blocks);
     745           6 :     if (ret < 0)
     746             :     {
     747           0 :         int         save_errno = errno;
     748             : 
     749           0 :         FreeFile(file);
     750           0 :         unlink(transient_dump_file_path);
     751           0 :         errno = save_errno;
     752           0 :         ereport(ERROR,
     753             :                 (errcode_for_file_access(),
     754             :                  errmsg("could not write to file \"%s\": %m",
     755             :                         transient_dump_file_path)));
     756             :     }
     757             : 
     758        1290 :     for (i = 0; i < num_blocks; i++)
     759             :     {
     760        1284 :         CHECK_FOR_INTERRUPTS();
     761             : 
     762        1284 :         ret = fprintf(file, "%u,%u,%u,%u,%u\n",
     763        1284 :                       block_info_array[i].database,
     764        1284 :                       block_info_array[i].tablespace,
     765        1284 :                       block_info_array[i].filenumber,
     766        1284 :                       (uint32) block_info_array[i].forknum,
     767        1284 :                       block_info_array[i].blocknum);
     768        1284 :         if (ret < 0)
     769             :         {
     770           0 :             int         save_errno = errno;
     771             : 
     772           0 :             FreeFile(file);
     773           0 :             unlink(transient_dump_file_path);
     774           0 :             errno = save_errno;
     775           0 :             ereport(ERROR,
     776             :                     (errcode_for_file_access(),
     777             :                      errmsg("could not write to file \"%s\": %m",
     778             :                             transient_dump_file_path)));
     779             :         }
     780             :     }
     781             : 
     782           6 :     pfree(block_info_array);
     783             : 
     784             :     /*
     785             :      * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things
     786             :      * permanent.
     787             :      */
     788           6 :     ret = FreeFile(file);
     789           6 :     if (ret != 0)
     790             :     {
     791           0 :         int         save_errno = errno;
     792             : 
     793           0 :         unlink(transient_dump_file_path);
     794           0 :         errno = save_errno;
     795           0 :         ereport(ERROR,
     796             :                 (errcode_for_file_access(),
     797             :                  errmsg("could not close file \"%s\": %m",
     798             :                         transient_dump_file_path)));
     799             :     }
     800             : 
     801           6 :     (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR);
     802           6 :     apw_state->pid_using_dumpfile = InvalidPid;
     803             : 
     804           6 :     ereport(DEBUG1,
     805             :             (errmsg_internal("wrote block details for %d blocks", num_blocks)));
     806           6 :     return num_blocks;
     807             : }
     808             : 
     809             : /*
     810             :  * SQL-callable function to launch autoprewarm.
     811             :  */
     812             : Datum
     813           0 : autoprewarm_start_worker(PG_FUNCTION_ARGS)
     814             : {
     815             :     pid_t       pid;
     816             : 
     817           0 :     if (!autoprewarm)
     818           0 :         ereport(ERROR,
     819             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     820             :                  errmsg("autoprewarm is disabled")));
     821             : 
     822           0 :     apw_init_shmem();
     823           0 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     824           0 :     pid = apw_state->bgworker_pid;
     825           0 :     LWLockRelease(&apw_state->lock);
     826             : 
     827           0 :     if (pid != InvalidPid)
     828           0 :         ereport(ERROR,
     829             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     830             :                  errmsg("autoprewarm worker is already running under PID %d",
     831             :                         (int) pid)));
     832             : 
     833           0 :     apw_start_leader_worker();
     834             : 
     835           0 :     PG_RETURN_VOID();
     836             : }
     837             : 
     838             : /*
     839             :  * SQL-callable function to perform an immediate block dump.
     840             :  *
     841             :  * Note: this is declared to return int8, as insurance against some
     842             :  * very distant day when we might make NBuffers wider than int.
     843             :  */
     844             : Datum
     845           2 : autoprewarm_dump_now(PG_FUNCTION_ARGS)
     846             : {
     847             :     int         num_blocks;
     848             : 
     849           2 :     apw_init_shmem();
     850             : 
     851           2 :     PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     852             :     {
     853           2 :         num_blocks = apw_dump_now(false, true);
     854             :     }
     855           2 :     PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     856             : 
     857           2 :     PG_RETURN_INT64((int64) num_blocks);
     858             : }
     859             : 
     860             : static void
     861           4 : apw_init_state(void *ptr)
     862             : {
     863           4 :     AutoPrewarmSharedState *state = (AutoPrewarmSharedState *) ptr;
     864             : 
     865           4 :     LWLockInitialize(&state->lock, LWLockNewTrancheId("autoprewarm"));
     866           4 :     state->bgworker_pid = InvalidPid;
     867           4 :     state->pid_using_dumpfile = InvalidPid;
     868           4 : }
     869             : 
     870             : /*
     871             :  * Allocate and initialize autoprewarm related shared memory, if not already
     872             :  * done, and set up backend-local pointer to that state.  Returns true if an
     873             :  * existing shared memory segment was found.
     874             :  */
     875             : static bool
     876           8 : apw_init_shmem(void)
     877             : {
     878             :     bool        found;
     879             : 
     880           8 :     apw_state = GetNamedDSMSegment("autoprewarm",
     881             :                                    sizeof(AutoPrewarmSharedState),
     882             :                                    apw_init_state,
     883             :                                    &found);
     884             : 
     885           8 :     return found;
     886             : }
     887             : 
     888             : /*
     889             :  * Clear our PID from autoprewarm shared state.
     890             :  */
     891             : static void
     892           4 : apw_detach_shmem(int code, Datum arg)
     893             : {
     894           4 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     895           4 :     if (apw_state->pid_using_dumpfile == MyProcPid)
     896           0 :         apw_state->pid_using_dumpfile = InvalidPid;
     897           4 :     if (apw_state->bgworker_pid == MyProcPid)
     898           4 :         apw_state->bgworker_pid = InvalidPid;
     899           4 :     LWLockRelease(&apw_state->lock);
     900           4 : }
     901             : 
     902             : /*
     903             :  * Start autoprewarm leader worker process.
     904             :  */
     905             : static void
     906           4 : apw_start_leader_worker(void)
     907             : {
     908           4 :     BackgroundWorker worker = {0};
     909             :     BackgroundWorkerHandle *handle;
     910             :     BgwHandleStatus status;
     911             :     pid_t       pid;
     912             : 
     913           4 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     914           4 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     915           4 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     916           4 :     strcpy(worker.bgw_function_name, "autoprewarm_main");
     917           4 :     strcpy(worker.bgw_name, "autoprewarm leader");
     918           4 :     strcpy(worker.bgw_type, "autoprewarm leader");
     919             : 
     920           4 :     if (process_shared_preload_libraries_in_progress)
     921             :     {
     922           4 :         RegisterBackgroundWorker(&worker);
     923           4 :         return;
     924             :     }
     925             : 
     926             :     /* must set notify PID to wait for startup */
     927           0 :     worker.bgw_notify_pid = MyProcPid;
     928             : 
     929           0 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     930           0 :         ereport(ERROR,
     931             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     932             :                  errmsg("could not register background process"),
     933             :                  errhint("You may need to increase \"max_worker_processes\".")));
     934             : 
     935           0 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     936           0 :     if (status != BGWH_STARTED)
     937           0 :         ereport(ERROR,
     938             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     939             :                  errmsg("could not start background process"),
     940             :                  errhint("More details may be available in the server log.")));
     941             : }
     942             : 
     943             : /*
     944             :  * Start autoprewarm per-database worker process.
     945             :  */
     946             : static void
     947           2 : apw_start_database_worker(void)
     948             : {
     949           2 :     BackgroundWorker worker = {0};
     950             :     BackgroundWorkerHandle *handle;
     951             : 
     952           2 :     worker.bgw_flags =
     953             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
     954           2 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     955           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     956           2 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     957           2 :     strcpy(worker.bgw_function_name, "autoprewarm_database_main");
     958           2 :     strcpy(worker.bgw_name, "autoprewarm worker");
     959           2 :     strcpy(worker.bgw_type, "autoprewarm worker");
     960             : 
     961             :     /* must set notify PID to wait for shutdown */
     962           2 :     worker.bgw_notify_pid = MyProcPid;
     963             : 
     964           2 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     965           0 :         ereport(ERROR,
     966             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     967             :                  errmsg("registering dynamic bgworker autoprewarm failed"),
     968             :                  errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
     969             : 
     970             :     /*
     971             :      * Ignore return value; if it fails, postmaster has died, but we have
     972             :      * checks for that elsewhere.
     973             :      */
     974           2 :     WaitForBackgroundWorkerShutdown(handle);
     975           2 : }
     976             : 
     977             : /* Compare member elements to check whether they are not equal. */
     978             : #define cmp_member_elem(fld)    \
     979             : do { \
     980             :     if (a->fld < b->fld)       \
     981             :         return -1;              \
     982             :     else if (a->fld > b->fld)  \
     983             :         return 1;               \
     984             : } while(0)
     985             : 
     986             : /*
     987             :  * apw_compare_blockinfo
     988             :  *
     989             :  * We depend on all records for a particular database being consecutive
     990             :  * in the dump file; each per-database worker will preload blocks until
     991             :  * it sees a block for some other database.  Sorting by tablespace,
     992             :  * filenumber, forknum, and blocknum isn't critical for correctness, but
     993             :  * helps us get a sequential I/O pattern.
     994             :  */
     995             : static int
     996        3480 : apw_compare_blockinfo(const void *p, const void *q)
     997             : {
     998        3480 :     const BlockInfoRecord *a = (const BlockInfoRecord *) p;
     999        3480 :     const BlockInfoRecord *b = (const BlockInfoRecord *) q;
    1000             : 
    1001        3480 :     cmp_member_elem(database);
    1002        3360 :     cmp_member_elem(tablespace);
    1003        3360 :     cmp_member_elem(filenumber);
    1004        1452 :     cmp_member_elem(forknum);
    1005        1284 :     cmp_member_elem(blocknum);
    1006             : 
    1007           0 :     return 0;
    1008             : }

Generated by: LCOV version 1.16