LCOV - code coverage report
Current view: top level - contrib/pg_prewarm - autoprewarm.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 239 316 75.6 %
Date: 2025-04-24 12:15:10 Functions: 15 16 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * autoprewarm.c
       4             :  *      Periodically dump information about the blocks present in
       5             :  *      shared_buffers, and reload them on server restart.
       6             :  *
       7             :  *      Due to locking considerations, we can't actually begin prewarming
       8             :  *      until the server reaches a consistent state.  We need the catalogs
       9             :  *      to be consistent so that we can figure out which relation to lock,
      10             :  *      and we need to lock the relations so that we don't try to prewarm
      11             :  *      pages from a relation that is in the process of being dropped.
      12             :  *
      13             :  *      While prewarming, autoprewarm will use two workers.  There's a
      14             :  *      leader worker that reads and sorts the list of blocks to be
      15             :  *      prewarmed and then launches a per-database worker for each
      16             :  *      relevant database in turn.  The former keeps running after the
      17             :  *      initial prewarm is complete to update the dump file periodically.
      18             :  *
      19             :  *  Copyright (c) 2016-2025, PostgreSQL Global Development Group
      20             :  *
      21             :  *  IDENTIFICATION
      22             :  *      contrib/pg_prewarm/autoprewarm.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : 
      27             : #include "postgres.h"
      28             : 
      29             : #include <unistd.h>
      30             : 
      31             : #include "access/relation.h"
      32             : #include "access/xact.h"
      33             : #include "pgstat.h"
      34             : #include "postmaster/bgworker.h"
      35             : #include "postmaster/interrupt.h"
      36             : #include "storage/buf_internals.h"
      37             : #include "storage/dsm.h"
      38             : #include "storage/dsm_registry.h"
      39             : #include "storage/fd.h"
      40             : #include "storage/ipc.h"
      41             : #include "storage/latch.h"
      42             : #include "storage/lwlock.h"
      43             : #include "storage/procsignal.h"
      44             : #include "storage/read_stream.h"
      45             : #include "storage/smgr.h"
      46             : #include "tcop/tcopprot.h"
      47             : #include "utils/guc.h"
      48             : #include "utils/rel.h"
      49             : #include "utils/relfilenumbermap.h"
      50             : #include "utils/timestamp.h"
      51             : 
      52             : #define AUTOPREWARM_FILE "autoprewarm.blocks"
      53             : 
      54             : /* Metadata for each block we dump. */
      55             : typedef struct BlockInfoRecord
      56             : {
      57             :     Oid         database;
      58             :     Oid         tablespace;
      59             :     RelFileNumber filenumber;
      60             :     ForkNumber  forknum;
      61             :     BlockNumber blocknum;
      62             : } BlockInfoRecord;
      63             : 
      64             : /* Shared state information for autoprewarm bgworker. */
      65             : typedef struct AutoPrewarmSharedState
      66             : {
      67             :     LWLock      lock;           /* mutual exclusion */
      68             :     pid_t       bgworker_pid;   /* for main bgworker */
      69             :     pid_t       pid_using_dumpfile; /* for autoprewarm or block dump */
      70             : 
      71             :     /* Following items are for communication with per-database worker */
      72             :     dsm_handle  block_info_handle;
      73             :     Oid         database;
      74             :     int         prewarm_start_idx;
      75             :     int         prewarm_stop_idx;
      76             :     int         prewarmed_blocks;
      77             : } AutoPrewarmSharedState;
      78             : 
      79             : /*
      80             :  * Private data passed through the read stream API for our use in the
      81             :  * callback.
      82             :  */
      83             : typedef struct AutoPrewarmReadStreamData
      84             : {
      85             :     /* The array of records containing the blocks we should prewarm. */
      86             :     BlockInfoRecord *block_info;
      87             : 
      88             :     /*
      89             :      * pos is the read stream callback's index into block_info. Because the
      90             :      * read stream may read ahead, pos is likely to be ahead of the index in
      91             :      * the main loop in autoprewarm_database_main().
      92             :      */
      93             :     int         pos;
      94             :     Oid         tablespace;
      95             :     RelFileNumber filenumber;
      96             :     ForkNumber  forknum;
      97             :     BlockNumber nblocks;
      98             : } AutoPrewarmReadStreamData;
      99             : 
     100             : 
     101             : PGDLLEXPORT void autoprewarm_main(Datum main_arg);
     102             : PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
     103             : 
     104           4 : PG_FUNCTION_INFO_V1(autoprewarm_start_worker);
     105           6 : PG_FUNCTION_INFO_V1(autoprewarm_dump_now);
     106             : 
     107             : static void apw_load_buffers(void);
     108             : static int  apw_dump_now(bool is_bgworker, bool dump_unlogged);
     109             : static void apw_start_leader_worker(void);
     110             : static void apw_start_database_worker(void);
     111             : static bool apw_init_shmem(void);
     112             : static void apw_detach_shmem(int code, Datum arg);
     113             : static int  apw_compare_blockinfo(const void *p, const void *q);
     114             : 
     115             : /* Pointer to shared-memory state. */
     116             : static AutoPrewarmSharedState *apw_state = NULL;
     117             : 
     118             : /* GUC variables. */
     119             : static bool autoprewarm = true; /* start worker? */
     120             : static int  autoprewarm_interval = 300; /* dump interval */
     121             : 
     122             : /*
     123             :  * Module load callback.
     124             :  */
     125             : void
     126          10 : _PG_init(void)
     127             : {
     128          10 :     DefineCustomIntVariable("pg_prewarm.autoprewarm_interval",
     129             :                             "Sets the interval between dumps of shared buffers",
     130             :                             "If set to zero, time-based dumping is disabled.",
     131             :                             &autoprewarm_interval,
     132             :                             300,
     133             :                             0, INT_MAX / 1000,
     134             :                             PGC_SIGHUP,
     135             :                             GUC_UNIT_S,
     136             :                             NULL,
     137             :                             NULL,
     138             :                             NULL);
     139             : 
     140          10 :     if (!process_shared_preload_libraries_in_progress)
     141           6 :         return;
     142             : 
     143             :     /* can't define PGC_POSTMASTER variable after startup */
     144           4 :     DefineCustomBoolVariable("pg_prewarm.autoprewarm",
     145             :                              "Starts the autoprewarm worker.",
     146             :                              NULL,
     147             :                              &autoprewarm,
     148             :                              true,
     149             :                              PGC_POSTMASTER,
     150             :                              0,
     151             :                              NULL,
     152             :                              NULL,
     153             :                              NULL);
     154             : 
     155           4 :     MarkGUCPrefixReserved("pg_prewarm");
     156             : 
     157             :     /* Register autoprewarm worker, if enabled. */
     158           4 :     if (autoprewarm)
     159           4 :         apw_start_leader_worker();
     160             : }
     161             : 
     162             : /*
     163             :  * Main entry point for the leader autoprewarm process.  Per-database workers
     164             :  * have a separate entry point.
     165             :  */
     166             : void
     167           4 : autoprewarm_main(Datum main_arg)
     168             : {
     169           4 :     bool        first_time = true;
     170           4 :     bool        final_dump_allowed = true;
     171           4 :     TimestampTz last_dump_time = 0;
     172             : 
     173             :     /* Establish signal handlers; once that's done, unblock signals. */
     174           4 :     pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
     175           4 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     176           4 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     177           4 :     BackgroundWorkerUnblockSignals();
     178             : 
     179             :     /* Create (if necessary) and attach to our shared memory area. */
     180           4 :     if (apw_init_shmem())
     181           0 :         first_time = false;
     182             : 
     183             :     /*
     184             :      * Set on-detach hook so that our PID will be cleared on exit.
     185             :      *
     186             :      * NB: Autoprewarm's state is stored in a DSM segment, and DSM segments
     187             :      * are detached before calling the on_shmem_exit callbacks, so we must put
     188             :      * apw_detach_shmem in the before_shmem_exit callback list.
     189             :      */
     190           4 :     before_shmem_exit(apw_detach_shmem, 0);
     191             : 
     192             :     /*
     193             :      * Store our PID in the shared memory area --- unless there's already
     194             :      * another worker running, in which case just exit.
     195             :      */
     196           4 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     197           4 :     if (apw_state->bgworker_pid != InvalidPid)
     198             :     {
     199           0 :         LWLockRelease(&apw_state->lock);
     200           0 :         ereport(LOG,
     201             :                 (errmsg("autoprewarm worker is already running under PID %d",
     202             :                         (int) apw_state->bgworker_pid)));
     203           0 :         return;
     204             :     }
     205           4 :     apw_state->bgworker_pid = MyProcPid;
     206           4 :     LWLockRelease(&apw_state->lock);
     207             : 
     208             :     /*
     209             :      * Preload buffers from the dump file only if we just created the shared
     210             :      * memory region.  Otherwise, it's either already been done or shouldn't
     211             :      * be done - e.g. because the old dump file has been overwritten since the
     212             :      * server was started.
     213             :      *
     214             :      * There's not much point in performing a dump immediately after we finish
     215             :      * preloading; so, if we do end up preloading, consider the last dump time
     216             :      * to be equal to the current time.
     217             :      *
     218             :      * If apw_load_buffers() is terminated early by a shutdown request,
     219             :      * prevent dumping out our state below the loop, because we'd effectively
     220             :      * just truncate the saved state to however much we'd managed to preload.
     221             :      */
     222           4 :     if (first_time)
     223             :     {
     224           4 :         apw_load_buffers();
     225           4 :         final_dump_allowed = !ShutdownRequestPending;
     226           4 :         last_dump_time = GetCurrentTimestamp();
     227             :     }
     228             : 
     229             :     /* Periodically dump buffers until terminated. */
     230          10 :     while (!ShutdownRequestPending)
     231             :     {
     232             :         /* In case of a SIGHUP, just reload the configuration. */
     233           6 :         if (ConfigReloadPending)
     234             :         {
     235           0 :             ConfigReloadPending = false;
     236           0 :             ProcessConfigFile(PGC_SIGHUP);
     237             :         }
     238             : 
     239           6 :         if (autoprewarm_interval <= 0)
     240             :         {
     241             :             /* We're only dumping at shutdown, so just wait forever. */
     242           6 :             (void) WaitLatch(MyLatch,
     243             :                              WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
     244             :                              -1L,
     245             :                              PG_WAIT_EXTENSION);
     246             :         }
     247             :         else
     248             :         {
     249             :             TimestampTz next_dump_time;
     250             :             long        delay_in_ms;
     251             : 
     252             :             /* Compute the next dump time. */
     253           0 :             next_dump_time =
     254           0 :                 TimestampTzPlusMilliseconds(last_dump_time,
     255             :                                             autoprewarm_interval * 1000);
     256             :             delay_in_ms =
     257           0 :                 TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
     258             :                                                 next_dump_time);
     259             : 
     260             :             /* Perform a dump if it's time. */
     261           0 :             if (delay_in_ms <= 0)
     262             :             {
     263           0 :                 last_dump_time = GetCurrentTimestamp();
     264           0 :                 apw_dump_now(true, false);
     265           0 :                 continue;
     266             :             }
     267             : 
     268             :             /* Sleep until the next dump time. */
     269           0 :             (void) WaitLatch(MyLatch,
     270             :                              WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     271             :                              delay_in_ms,
     272             :                              PG_WAIT_EXTENSION);
     273             :         }
     274             : 
     275             :         /* Reset the latch, loop. */
     276           6 :         ResetLatch(MyLatch);
     277             :     }
     278             : 
     279             :     /*
     280             :      * Dump one last time.  We assume this is probably the result of a system
     281             :      * shutdown, although it's possible that we've merely been terminated.
     282             :      */
     283           4 :     if (final_dump_allowed)
     284           4 :         apw_dump_now(true, true);
     285             : }
     286             : 
     287             : /*
     288             :  * Read the dump file and launch per-database workers one at a time to
     289             :  * prewarm the buffers found there.
     290             :  */
     291             : static void
     292           4 : apw_load_buffers(void)
     293             : {
     294           4 :     FILE       *file = NULL;
     295             :     int         num_elements,
     296             :                 i;
     297             :     BlockInfoRecord *blkinfo;
     298             :     dsm_segment *seg;
     299             : 
     300             :     /*
     301             :      * Skip the prewarm if the dump file is in use; otherwise, prevent any
     302             :      * other process from writing it while we're using it.
     303             :      */
     304           4 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     305           4 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     306           4 :         apw_state->pid_using_dumpfile = MyProcPid;
     307             :     else
     308             :     {
     309           0 :         LWLockRelease(&apw_state->lock);
     310           0 :         ereport(LOG,
     311             :                 (errmsg("skipping prewarm because block dump file is being written by PID %d",
     312             :                         (int) apw_state->pid_using_dumpfile)));
     313           2 :         return;
     314             :     }
     315           4 :     LWLockRelease(&apw_state->lock);
     316             : 
     317             :     /*
     318             :      * Open the block dump file.  Exit quietly if it doesn't exist, but report
     319             :      * any other error.
     320             :      */
     321           4 :     file = AllocateFile(AUTOPREWARM_FILE, "r");
     322           4 :     if (!file)
     323             :     {
     324           2 :         if (errno == ENOENT)
     325             :         {
     326           2 :             LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     327           2 :             apw_state->pid_using_dumpfile = InvalidPid;
     328           2 :             LWLockRelease(&apw_state->lock);
     329           2 :             return;             /* No file to load. */
     330             :         }
     331           0 :         ereport(ERROR,
     332             :                 (errcode_for_file_access(),
     333             :                  errmsg("could not read file \"%s\": %m",
     334             :                         AUTOPREWARM_FILE)));
     335             :     }
     336             : 
     337             :     /* First line of the file is a record count. */
     338           2 :     if (fscanf(file, "<<%d>>\n", &num_elements) != 1)
     339           0 :         ereport(ERROR,
     340             :                 (errcode_for_file_access(),
     341             :                  errmsg("could not read from file \"%s\": %m",
     342             :                         AUTOPREWARM_FILE)));
     343             : 
     344             :     /* Allocate a dynamic shared memory segment to store the record data. */
     345           2 :     seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0);
     346           2 :     blkinfo = (BlockInfoRecord *) dsm_segment_address(seg);
     347             : 
     348             :     /* Read records, one per line. */
     349         430 :     for (i = 0; i < num_elements; i++)
     350             :     {
     351             :         unsigned    forknum;
     352             : 
     353         428 :         if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database,
     354         428 :                    &blkinfo[i].tablespace, &blkinfo[i].filenumber,
     355         428 :                    &forknum, &blkinfo[i].blocknum) != 5)
     356           0 :             ereport(ERROR,
     357             :                     (errmsg("autoprewarm block dump file is corrupted at line %d",
     358             :                             i + 1)));
     359         428 :         blkinfo[i].forknum = forknum;
     360             :     }
     361             : 
     362           2 :     FreeFile(file);
     363             : 
     364             :     /* Sort the blocks to be loaded. */
     365           2 :     qsort(blkinfo, num_elements, sizeof(BlockInfoRecord),
     366             :           apw_compare_blockinfo);
     367             : 
     368             :     /* Populate shared memory state. */
     369           2 :     apw_state->block_info_handle = dsm_segment_handle(seg);
     370           2 :     apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0;
     371           2 :     apw_state->prewarmed_blocks = 0;
     372             : 
     373             :     /* Get the info position of the first block of the next database. */
     374           4 :     while (apw_state->prewarm_start_idx < num_elements)
     375             :     {
     376           2 :         int         j = apw_state->prewarm_start_idx;
     377           2 :         Oid         current_db = blkinfo[j].database;
     378             : 
     379             :         /*
     380             :          * Advance the prewarm_stop_idx to the first BlockInfoRecord that does
     381             :          * not belong to this database.
     382             :          */
     383           2 :         j++;
     384         428 :         while (j < num_elements)
     385             :         {
     386         426 :             if (current_db != blkinfo[j].database)
     387             :             {
     388             :                 /*
     389             :                  * Combine BlockInfoRecords for global objects with those of
     390             :                  * the database.
     391             :                  */
     392           2 :                 if (current_db != InvalidOid)
     393           0 :                     break;
     394           2 :                 current_db = blkinfo[j].database;
     395             :             }
     396             : 
     397         426 :             j++;
     398             :         }
     399             : 
     400             :         /*
     401             :          * If we reach this point with current_db == InvalidOid, then only
     402             :          * BlockInfoRecords belonging to global objects exist.  We can't
     403             :          * prewarm without a database connection, so just bail out.
     404             :          */
     405           2 :         if (current_db == InvalidOid)
     406           0 :             break;
     407             : 
     408             :         /* Configure stop point and database for next per-database worker. */
     409           2 :         apw_state->prewarm_stop_idx = j;
     410           2 :         apw_state->database = current_db;
     411             :         Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx);
     412             : 
     413             :         /* If we've run out of free buffers, don't launch another worker. */
     414           2 :         if (!have_free_buffer())
     415           0 :             break;
     416             : 
     417             :         /*
     418             :          * Likewise, don't launch if we've already been told to shut down.
     419             :          * (The launch would fail anyway, but we might as well skip it.)
     420             :          */
     421           2 :         if (ShutdownRequestPending)
     422           0 :             break;
     423             : 
     424             :         /*
     425             :          * Start a per-database worker to load blocks for this database; this
     426             :          * function will return once the per-database worker exits.
     427             :          */
     428           2 :         apw_start_database_worker();
     429             : 
     430             :         /* Prepare for next database. */
     431           2 :         apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx;
     432             :     }
     433             : 
     434             :     /* Clean up. */
     435           2 :     dsm_detach(seg);
     436           2 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     437           2 :     apw_state->block_info_handle = DSM_HANDLE_INVALID;
     438           2 :     apw_state->pid_using_dumpfile = InvalidPid;
     439           2 :     LWLockRelease(&apw_state->lock);
     440             : 
     441             :     /* Report our success, if we were able to finish. */
     442           2 :     if (!ShutdownRequestPending)
     443           2 :         ereport(LOG,
     444             :                 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
     445             :                         apw_state->prewarmed_blocks, num_elements)));
     446             : }
     447             : 
     448             : /*
     449             :  * Return the next block number of a specific relation and fork to read
     450             :  * according to the array of BlockInfoRecord.
     451             :  */
     452             : static BlockNumber
     453         554 : apw_read_stream_next_block(ReadStream *stream,
     454             :                            void *callback_private_data,
     455             :                            void *per_buffer_data)
     456             : {
     457         554 :     AutoPrewarmReadStreamData *p = callback_private_data;
     458             : 
     459         554 :     CHECK_FOR_INTERRUPTS();
     460             : 
     461         554 :     while (p->pos < apw_state->prewarm_stop_idx)
     462             :     {
     463         552 :         BlockInfoRecord blk = p->block_info[p->pos];
     464             : 
     465         552 :         if (!have_free_buffer())
     466             :         {
     467           0 :             p->pos = apw_state->prewarm_stop_idx;
     468         552 :             return InvalidBlockNumber;
     469             :         }
     470             : 
     471         552 :         if (blk.tablespace != p->tablespace)
     472           2 :             return InvalidBlockNumber;
     473             : 
     474         550 :         if (blk.filenumber != p->filenumber)
     475          94 :             return InvalidBlockNumber;
     476             : 
     477         456 :         if (blk.forknum != p->forknum)
     478          28 :             return InvalidBlockNumber;
     479             : 
     480         428 :         p->pos++;
     481             : 
     482             :         /*
     483             :          * Check whether blocknum is valid and within fork file size.
     484             :          * Fast-forward through any invalid blocks. We want p->pos to reflect
     485             :          * the location of the next relation or fork before ending the stream.
     486             :          */
     487         428 :         if (blk.blocknum >= p->nblocks)
     488           0 :             continue;
     489             : 
     490         428 :         return blk.blocknum;
     491             :     }
     492             : 
     493           2 :     return InvalidBlockNumber;
     494             : }
     495             : 
     496             : /*
     497             :  * Prewarm all blocks for one database (and possibly also global objects, if
     498             :  * those got grouped with this database).
     499             :  */
     500             : void
     501           2 : autoprewarm_database_main(Datum main_arg)
     502             : {
     503             :     BlockInfoRecord *block_info;
     504             :     int         i;
     505             :     BlockInfoRecord blk;
     506             :     dsm_segment *seg;
     507             : 
     508             :     /* Establish signal handlers; once that's done, unblock signals. */
     509           2 :     pqsignal(SIGTERM, die);
     510           2 :     BackgroundWorkerUnblockSignals();
     511             : 
     512             :     /* Connect to correct database and get block information. */
     513           2 :     apw_init_shmem();
     514           2 :     seg = dsm_attach(apw_state->block_info_handle);
     515           2 :     if (seg == NULL)
     516           0 :         ereport(ERROR,
     517             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     518             :                  errmsg("could not map dynamic shared memory segment")));
     519           2 :     BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
     520           2 :     block_info = (BlockInfoRecord *) dsm_segment_address(seg);
     521             : 
     522           2 :     i = apw_state->prewarm_start_idx;
     523           2 :     blk = block_info[i];
     524             : 
     525             :     /*
     526             :      * Loop until we run out of blocks to prewarm or until we run out of free
     527             :      * buffers.
     528             :      */
     529         100 :     while (i < apw_state->prewarm_stop_idx && have_free_buffer())
     530             :     {
     531          98 :         Oid         tablespace = blk.tablespace;
     532          98 :         RelFileNumber filenumber = blk.filenumber;
     533             :         Oid         reloid;
     534             :         Relation    rel;
     535             : 
     536             :         /*
     537             :          * All blocks between prewarm_start_idx and prewarm_stop_idx should
     538             :          * belong either to global objects or the same database.
     539             :          */
     540             :         Assert(blk.database == apw_state->database || blk.database == 0);
     541             : 
     542          98 :         StartTransactionCommand();
     543             : 
     544          98 :         reloid = RelidByRelfilenumber(blk.tablespace, blk.filenumber);
     545         196 :         if (!OidIsValid(reloid) ||
     546          98 :             (rel = try_relation_open(reloid, AccessShareLock)) == NULL)
     547             :         {
     548             :             /* We failed to open the relation, so there is nothing to close. */
     549           0 :             CommitTransactionCommand();
     550             : 
     551             :             /*
     552             :              * Fast-forward to the next relation. We want to skip all of the
     553             :              * other records referencing this relation since we know we can't
     554             :              * open it. That way, we avoid repeatedly trying and failing to
     555             :              * open the same relation.
     556             :              */
     557           0 :             for (; i < apw_state->prewarm_stop_idx; i++)
     558             :             {
     559           0 :                 blk = block_info[i];
     560           0 :                 if (blk.tablespace != tablespace ||
     561           0 :                     blk.filenumber != filenumber)
     562             :                     break;
     563             :             }
     564             : 
     565             :             /* Time to try and open our newfound relation */
     566           0 :             continue;
     567             :         }
     568             : 
     569             :         /*
     570             :          * We have a relation; now let's loop until we find a valid fork of
     571             :          * the relation or we run out of free buffers. Once we've read from
     572             :          * all valid forks or run out of options, we'll close the relation and
     573             :          * move on.
     574             :          */
     575         224 :         while (i < apw_state->prewarm_stop_idx &&
     576         222 :                blk.tablespace == tablespace &&
     577         346 :                blk.filenumber == filenumber &&
     578         126 :                have_free_buffer())
     579             :         {
     580         126 :             ForkNumber  forknum = blk.forknum;
     581             :             BlockNumber nblocks;
     582             :             struct AutoPrewarmReadStreamData p;
     583             :             ReadStream *stream;
     584             :             Buffer      buf;
     585             : 
     586             :             /*
     587             :              * smgrexists is not safe for illegal forknum, hence check whether
     588             :              * the passed forknum is valid before using it in smgrexists.
     589             :              */
     590         126 :             if (blk.forknum <= InvalidForkNumber ||
     591         126 :                 blk.forknum > MAX_FORKNUM ||
     592         126 :                 !smgrexists(RelationGetSmgr(rel), blk.forknum))
     593             :             {
     594             :                 /*
     595             :                  * Fast-forward to the next fork. We want to skip all of the
     596             :                  * other records referencing this fork since we already know
     597             :                  * it's not valid.
     598             :                  */
     599           0 :                 for (; i < apw_state->prewarm_stop_idx; i++)
     600             :                 {
     601           0 :                     blk = block_info[i];
     602           0 :                     if (blk.tablespace != tablespace ||
     603           0 :                         blk.filenumber != filenumber ||
     604           0 :                         blk.forknum != forknum)
     605             :                         break;
     606             :                 }
     607             : 
     608             :                 /* Time to check if this newfound fork is valid */
     609           0 :                 continue;
     610             :             }
     611             : 
     612         126 :             nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
     613             : 
     614         126 :             p = (struct AutoPrewarmReadStreamData)
     615             :             {
     616             :                 .block_info = block_info,
     617             :                     .pos = i,
     618             :                     .tablespace = tablespace,
     619             :                     .filenumber = filenumber,
     620             :                     .forknum = forknum,
     621             :                     .nblocks = nblocks,
     622             :             };
     623             : 
     624         126 :             stream = read_stream_begin_relation(READ_STREAM_DEFAULT |
     625             :                                                 READ_STREAM_USE_BATCHING,
     626             :                                                 NULL,
     627             :                                                 rel,
     628             :                                                 p.forknum,
     629             :                                                 apw_read_stream_next_block,
     630             :                                                 &p,
     631             :                                                 0);
     632             : 
     633             :             /*
     634             :              * Loop until we've prewarmed all the blocks from this fork. The
     635             :              * read stream callback will check that we still have free buffers
     636             :              * before requesting each block from the read stream API.
     637             :              */
     638         554 :             while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
     639             :             {
     640         428 :                 apw_state->prewarmed_blocks++;
     641         428 :                 ReleaseBuffer(buf);
     642             :             }
     643             : 
     644         126 :             read_stream_end(stream);
     645             : 
     646             :             /* Advance i past all the blocks just prewarmed. */
     647         126 :             i = p.pos;
     648         126 :             blk = block_info[i];
     649             :         }
     650             : 
     651          98 :         relation_close(rel, AccessShareLock);
     652          98 :         CommitTransactionCommand();
     653             :     }
     654             : 
     655           2 :     dsm_detach(seg);
     656           2 : }
     657             : 
     658             : /*
     659             :  * Dump information on blocks in shared buffers.  We use a text format here
     660             :  * so that it's easy to understand and even change the file contents if
     661             :  * necessary.
     662             :  * Returns the number of blocks dumped.
     663             :  */
     664             : static int
     665           6 : apw_dump_now(bool is_bgworker, bool dump_unlogged)
     666             : {
     667             :     int         num_blocks;
     668             :     int         i;
     669             :     int         ret;
     670             :     BlockInfoRecord *block_info_array;
     671             :     BufferDesc *bufHdr;
     672             :     FILE       *file;
     673             :     char        transient_dump_file_path[MAXPGPATH];
     674             :     pid_t       pid;
     675             : 
     676           6 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     677           6 :     pid = apw_state->pid_using_dumpfile;
     678           6 :     if (apw_state->pid_using_dumpfile == InvalidPid)
     679           6 :         apw_state->pid_using_dumpfile = MyProcPid;
     680           6 :     LWLockRelease(&apw_state->lock);
     681             : 
     682           6 :     if (pid != InvalidPid)
     683             :     {
     684           0 :         if (!is_bgworker)
     685           0 :             ereport(ERROR,
     686             :                     (errmsg("could not perform block dump because dump file is being used by PID %d",
     687             :                             (int) apw_state->pid_using_dumpfile)));
     688             : 
     689           0 :         ereport(LOG,
     690             :                 (errmsg("skipping block dump because it is already being performed by PID %d",
     691             :                         (int) apw_state->pid_using_dumpfile)));
     692           0 :         return 0;
     693             :     }
     694             : 
     695             :     block_info_array =
     696           6 :         (BlockInfoRecord *) palloc(sizeof(BlockInfoRecord) * NBuffers);
     697             : 
     698       98310 :     for (num_blocks = 0, i = 0; i < NBuffers; i++)
     699             :     {
     700             :         uint32      buf_state;
     701             : 
     702       98304 :         CHECK_FOR_INTERRUPTS();
     703             : 
     704       98304 :         bufHdr = GetBufferDescriptor(i);
     705             : 
     706             :         /* Lock each buffer header before inspecting. */
     707       98304 :         buf_state = LockBufHdr(bufHdr);
     708             : 
     709             :         /*
     710             :          * Unlogged tables will be automatically truncated after a crash or
     711             :          * unclean shutdown. In such cases we need not prewarm them. Dump them
     712             :          * only if requested by caller.
     713             :          */
     714       98304 :         if (buf_state & BM_TAG_VALID &&
     715        1284 :             ((buf_state & BM_PERMANENT) || dump_unlogged))
     716             :         {
     717        1284 :             block_info_array[num_blocks].database = bufHdr->tag.dbOid;
     718        1284 :             block_info_array[num_blocks].tablespace = bufHdr->tag.spcOid;
     719        2568 :             block_info_array[num_blocks].filenumber =
     720        1284 :                 BufTagGetRelNumber(&bufHdr->tag);
     721        2568 :             block_info_array[num_blocks].forknum =
     722        1284 :                 BufTagGetForkNum(&bufHdr->tag);
     723        1284 :             block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum;
     724        1284 :             ++num_blocks;
     725             :         }
     726             : 
     727       98304 :         UnlockBufHdr(bufHdr, buf_state);
     728             :     }
     729             : 
     730           6 :     snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE);
     731           6 :     file = AllocateFile(transient_dump_file_path, "w");
     732           6 :     if (!file)
     733           0 :         ereport(ERROR,
     734             :                 (errcode_for_file_access(),
     735             :                  errmsg("could not open file \"%s\": %m",
     736             :                         transient_dump_file_path)));
     737             : 
     738           6 :     ret = fprintf(file, "<<%d>>\n", num_blocks);
     739           6 :     if (ret < 0)
     740             :     {
     741           0 :         int         save_errno = errno;
     742             : 
     743           0 :         FreeFile(file);
     744           0 :         unlink(transient_dump_file_path);
     745           0 :         errno = save_errno;
     746           0 :         ereport(ERROR,
     747             :                 (errcode_for_file_access(),
     748             :                  errmsg("could not write to file \"%s\": %m",
     749             :                         transient_dump_file_path)));
     750             :     }
     751             : 
     752        1290 :     for (i = 0; i < num_blocks; i++)
     753             :     {
     754        1284 :         CHECK_FOR_INTERRUPTS();
     755             : 
     756        1284 :         ret = fprintf(file, "%u,%u,%u,%u,%u\n",
     757        1284 :                       block_info_array[i].database,
     758        1284 :                       block_info_array[i].tablespace,
     759        1284 :                       block_info_array[i].filenumber,
     760        1284 :                       (uint32) block_info_array[i].forknum,
     761        1284 :                       block_info_array[i].blocknum);
     762        1284 :         if (ret < 0)
     763             :         {
     764           0 :             int         save_errno = errno;
     765             : 
     766           0 :             FreeFile(file);
     767           0 :             unlink(transient_dump_file_path);
     768           0 :             errno = save_errno;
     769           0 :             ereport(ERROR,
     770             :                     (errcode_for_file_access(),
     771             :                      errmsg("could not write to file \"%s\": %m",
     772             :                             transient_dump_file_path)));
     773             :         }
     774             :     }
     775             : 
     776           6 :     pfree(block_info_array);
     777             : 
     778             :     /*
     779             :      * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things
     780             :      * permanent.
     781             :      */
     782           6 :     ret = FreeFile(file);
     783           6 :     if (ret != 0)
     784             :     {
     785           0 :         int         save_errno = errno;
     786             : 
     787           0 :         unlink(transient_dump_file_path);
     788           0 :         errno = save_errno;
     789           0 :         ereport(ERROR,
     790             :                 (errcode_for_file_access(),
     791             :                  errmsg("could not close file \"%s\": %m",
     792             :                         transient_dump_file_path)));
     793             :     }
     794             : 
     795           6 :     (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR);
     796           6 :     apw_state->pid_using_dumpfile = InvalidPid;
     797             : 
     798           6 :     ereport(DEBUG1,
     799             :             (errmsg_internal("wrote block details for %d blocks", num_blocks)));
     800           6 :     return num_blocks;
     801             : }
     802             : 
     803             : /*
     804             :  * SQL-callable function to launch autoprewarm.
     805             :  */
     806             : Datum
     807           0 : autoprewarm_start_worker(PG_FUNCTION_ARGS)
     808             : {
     809             :     pid_t       pid;
     810             : 
     811           0 :     if (!autoprewarm)
     812           0 :         ereport(ERROR,
     813             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     814             :                  errmsg("autoprewarm is disabled")));
     815             : 
     816           0 :     apw_init_shmem();
     817           0 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     818           0 :     pid = apw_state->bgworker_pid;
     819           0 :     LWLockRelease(&apw_state->lock);
     820             : 
     821           0 :     if (pid != InvalidPid)
     822           0 :         ereport(ERROR,
     823             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     824             :                  errmsg("autoprewarm worker is already running under PID %d",
     825             :                         (int) pid)));
     826             : 
     827           0 :     apw_start_leader_worker();
     828             : 
     829           0 :     PG_RETURN_VOID();
     830             : }
     831             : 
     832             : /*
     833             :  * SQL-callable function to perform an immediate block dump.
     834             :  *
     835             :  * Note: this is declared to return int8, as insurance against some
     836             :  * very distant day when we might make NBuffers wider than int.
     837             :  */
     838             : Datum
     839           2 : autoprewarm_dump_now(PG_FUNCTION_ARGS)
     840             : {
     841             :     int         num_blocks;
     842             : 
     843           2 :     apw_init_shmem();
     844             : 
     845           2 :     PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     846             :     {
     847           2 :         num_blocks = apw_dump_now(false, true);
     848             :     }
     849           2 :     PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
     850             : 
     851           2 :     PG_RETURN_INT64((int64) num_blocks);
     852             : }
     853             : 
     854             : static void
     855           4 : apw_init_state(void *ptr)
     856             : {
     857           4 :     AutoPrewarmSharedState *state = (AutoPrewarmSharedState *) ptr;
     858             : 
     859           4 :     LWLockInitialize(&state->lock, LWLockNewTrancheId());
     860           4 :     state->bgworker_pid = InvalidPid;
     861           4 :     state->pid_using_dumpfile = InvalidPid;
     862           4 : }
     863             : 
     864             : /*
     865             :  * Allocate and initialize autoprewarm related shared memory, if not already
     866             :  * done, and set up backend-local pointer to that state.  Returns true if an
     867             :  * existing shared memory segment was found.
     868             :  */
     869             : static bool
     870           8 : apw_init_shmem(void)
     871             : {
     872             :     bool        found;
     873             : 
     874           8 :     apw_state = GetNamedDSMSegment("autoprewarm",
     875             :                                    sizeof(AutoPrewarmSharedState),
     876             :                                    apw_init_state,
     877             :                                    &found);
     878           8 :     LWLockRegisterTranche(apw_state->lock.tranche, "autoprewarm");
     879             : 
     880           8 :     return found;
     881             : }
     882             : 
     883             : /*
     884             :  * Clear our PID from autoprewarm shared state.
     885             :  */
     886             : static void
     887           4 : apw_detach_shmem(int code, Datum arg)
     888             : {
     889           4 :     LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
     890           4 :     if (apw_state->pid_using_dumpfile == MyProcPid)
     891           0 :         apw_state->pid_using_dumpfile = InvalidPid;
     892           4 :     if (apw_state->bgworker_pid == MyProcPid)
     893           4 :         apw_state->bgworker_pid = InvalidPid;
     894           4 :     LWLockRelease(&apw_state->lock);
     895           4 : }
     896             : 
     897             : /*
     898             :  * Start autoprewarm leader worker process.
     899             :  */
     900             : static void
     901           4 : apw_start_leader_worker(void)
     902             : {
     903           4 :     BackgroundWorker worker = {0};
     904             :     BackgroundWorkerHandle *handle;
     905             :     BgwHandleStatus status;
     906             :     pid_t       pid;
     907             : 
     908           4 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     909           4 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     910           4 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     911           4 :     strcpy(worker.bgw_function_name, "autoprewarm_main");
     912           4 :     strcpy(worker.bgw_name, "autoprewarm leader");
     913           4 :     strcpy(worker.bgw_type, "autoprewarm leader");
     914             : 
     915           4 :     if (process_shared_preload_libraries_in_progress)
     916             :     {
     917           4 :         RegisterBackgroundWorker(&worker);
     918           4 :         return;
     919             :     }
     920             : 
     921             :     /* must set notify PID to wait for startup */
     922           0 :     worker.bgw_notify_pid = MyProcPid;
     923             : 
     924           0 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     925           0 :         ereport(ERROR,
     926             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     927             :                  errmsg("could not register background process"),
     928             :                  errhint("You may need to increase \"max_worker_processes\".")));
     929             : 
     930           0 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     931           0 :     if (status != BGWH_STARTED)
     932           0 :         ereport(ERROR,
     933             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     934             :                  errmsg("could not start background process"),
     935             :                  errhint("More details may be available in the server log.")));
     936             : }
     937             : 
     938             : /*
     939             :  * Start autoprewarm per-database worker process.
     940             :  */
     941             : static void
     942           2 : apw_start_database_worker(void)
     943             : {
     944           2 :     BackgroundWorker worker = {0};
     945             :     BackgroundWorkerHandle *handle;
     946             : 
     947           2 :     worker.bgw_flags =
     948             :         BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
     949           2 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     950           2 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     951           2 :     strcpy(worker.bgw_library_name, "pg_prewarm");
     952           2 :     strcpy(worker.bgw_function_name, "autoprewarm_database_main");
     953           2 :     strcpy(worker.bgw_name, "autoprewarm worker");
     954           2 :     strcpy(worker.bgw_type, "autoprewarm worker");
     955             : 
     956             :     /* must set notify PID to wait for shutdown */
     957           2 :     worker.bgw_notify_pid = MyProcPid;
     958             : 
     959           2 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     960           0 :         ereport(ERROR,
     961             :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     962             :                  errmsg("registering dynamic bgworker autoprewarm failed"),
     963             :                  errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
     964             : 
     965             :     /*
     966             :      * Ignore return value; if it fails, postmaster has died, but we have
     967             :      * checks for that elsewhere.
     968             :      */
     969           2 :     WaitForBackgroundWorkerShutdown(handle);
     970           2 : }
     971             : 
     972             : /* Compare member elements to check whether they are not equal. */
     973             : #define cmp_member_elem(fld)    \
     974             : do { \
     975             :     if (a->fld < b->fld)       \
     976             :         return -1;              \
     977             :     else if (a->fld > b->fld)  \
     978             :         return 1;               \
     979             : } while(0)
     980             : 
     981             : /*
     982             :  * apw_compare_blockinfo
     983             :  *
     984             :  * We depend on all records for a particular database being consecutive
     985             :  * in the dump file; each per-database worker will preload blocks until
     986             :  * it sees a block for some other database.  Sorting by tablespace,
     987             :  * filenumber, forknum, and blocknum isn't critical for correctness, but
     988             :  * helps us get a sequential I/O pattern.
     989             :  */
     990             : static int
     991        3516 : apw_compare_blockinfo(const void *p, const void *q)
     992             : {
     993        3516 :     const BlockInfoRecord *a = (const BlockInfoRecord *) p;
     994        3516 :     const BlockInfoRecord *b = (const BlockInfoRecord *) q;
     995             : 
     996        3516 :     cmp_member_elem(database);
     997        3390 :     cmp_member_elem(tablespace);
     998        3390 :     cmp_member_elem(filenumber);
     999        1400 :     cmp_member_elem(forknum);
    1000        1192 :     cmp_member_elem(blocknum);
    1001             : 
    1002           0 :     return 0;
    1003             : }

Generated by: LCOV version 1.14