LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_throttle.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.3 % 42 35
Test Date: 2026-03-03 02:14:47 Functions: 80.0 % 5 4
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * basebackup_throttle.c
       4              :  *    Basebackup sink implementing throttling. Data is forwarded to the
       5              :  *    next base backup sink in the chain at a rate no greater than the
       6              :  *    configured maximum.
       7              :  *
       8              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/backup/basebackup_throttle.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : #include "postgres.h"
      16              : 
      17              : #include "backup/basebackup_sink.h"
      18              : #include "miscadmin.h"
      19              : #include "pgstat.h"
      20              : #include "storage/latch.h"
      21              : #include "utils/timestamp.h"
      22              : 
      23              : typedef struct bbsink_throttle
      24              : {
      25              :     /* Common information for all types of sink. */
      26              :     bbsink      base;
      27              : 
      28              :     /* The actual number of bytes, transfer of which may cause sleep. */
      29              :     uint64      throttling_sample;
      30              : 
      31              :     /* Amount of data already transferred but not yet throttled.  */
      32              :     int64       throttling_counter;
      33              : 
      34              :     /* The minimum time required to transfer throttling_sample bytes. */
      35              :     TimeOffset  elapsed_min_unit;
      36              : 
      37              :     /* The last check of the transfer rate. */
      38              :     TimestampTz throttled_last;
      39              : } bbsink_throttle;
      40              : 
      41              : static void bbsink_throttle_begin_backup(bbsink *sink);
      42              : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
      43              : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
      44              : static void throttle(bbsink_throttle *sink, size_t increment);
      45              : 
      46              : static const bbsink_ops bbsink_throttle_ops = {
      47              :     .begin_backup = bbsink_throttle_begin_backup,
      48              :     .begin_archive = bbsink_forward_begin_archive,
      49              :     .archive_contents = bbsink_throttle_archive_contents,
      50              :     .end_archive = bbsink_forward_end_archive,
      51              :     .begin_manifest = bbsink_forward_begin_manifest,
      52              :     .manifest_contents = bbsink_throttle_manifest_contents,
      53              :     .end_manifest = bbsink_forward_end_manifest,
      54              :     .end_backup = bbsink_forward_end_backup,
      55              :     .cleanup = bbsink_forward_cleanup
      56              : };
      57              : 
      58              : /*
      59              :  * How frequently to throttle, as a fraction of the specified rate-second.
      60              :  */
      61              : #define THROTTLING_FREQUENCY    8
      62              : 
      63              : /*
      64              :  * Create a new basebackup sink that performs throttling and forwards data
      65              :  * to a successor sink.
      66              :  */
      67              : bbsink *
      68            1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
      69              : {
      70              :     bbsink_throttle *sink;
      71              : 
      72              :     Assert(next != NULL);
      73              :     Assert(maxrate > 0);
      74              : 
      75            1 :     sink = palloc0_object(bbsink_throttle);
      76            1 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
      77            1 :     sink->base.bbs_next = next;
      78              : 
      79            1 :     sink->throttling_sample =
      80            1 :         (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
      81              : 
      82              :     /*
      83              :      * The minimum amount of time for throttling_sample bytes to be
      84              :      * transferred.
      85              :      */
      86            1 :     sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
      87              : 
      88            1 :     return &sink->base;
      89              : }
      90              : 
      91              : /*
      92              :  * There's no real work to do here, but we need to record the current time so
      93              :  * that it can be used for future calculations.
      94              :  */
      95              : static void
      96            1 : bbsink_throttle_begin_backup(bbsink *sink)
      97              : {
      98            1 :     bbsink_throttle *mysink = (bbsink_throttle *) sink;
      99              : 
     100            1 :     bbsink_forward_begin_backup(sink);
     101              : 
     102              :     /* The 'real data' starts now (header was ignored). */
     103            1 :     mysink->throttled_last = GetCurrentTimestamp();
     104            1 : }
     105              : 
     106              : /*
     107              :  * First throttle, and then pass archive contents to next sink.
     108              :  */
     109              : static void
     110           21 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
     111              : {
     112           21 :     throttle((bbsink_throttle *) sink, len);
     113              : 
     114           21 :     bbsink_forward_archive_contents(sink, len);
     115           21 : }
     116              : 
     117              : /*
     118              :  * First throttle, and then pass manifest contents to next sink.
     119              :  */
     120              : static void
     121            0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
     122              : {
     123            0 :     throttle((bbsink_throttle *) sink, len);
     124              : 
     125            0 :     bbsink_forward_manifest_contents(sink, len);
     126            0 : }
     127              : 
     128              : /*
     129              :  * Increment the network transfer counter by the given number of bytes,
     130              :  * and sleep if necessary to comply with the requested network transfer
     131              :  * rate.
     132              :  */
     133              : static void
     134           21 : throttle(bbsink_throttle *sink, size_t increment)
     135              : {
     136              :     TimeOffset  elapsed_min;
     137              : 
     138              :     Assert(sink->throttling_counter >= 0);
     139              : 
     140           21 :     sink->throttling_counter += increment;
     141           21 :     if (sink->throttling_counter < sink->throttling_sample)
     142           17 :         return;
     143              : 
     144              :     /* How much time should have elapsed at minimum? */
     145            4 :     elapsed_min = sink->elapsed_min_unit *
     146            4 :         (sink->throttling_counter / sink->throttling_sample);
     147              : 
     148              :     /*
     149              :      * Since the latch could be set repeatedly because of concurrently WAL
     150              :      * activity, sleep in a loop to ensure enough time has passed.
     151              :      */
     152              :     for (;;)
     153            0 :     {
     154              :         TimeOffset  elapsed,
     155              :                     sleep;
     156              :         int         wait_result;
     157              : 
     158              :         /* Time elapsed since the last measurement (and possible wake up). */
     159            4 :         elapsed = GetCurrentTimestamp() - sink->throttled_last;
     160              : 
     161              :         /* sleep if the transfer is faster than it should be */
     162            4 :         sleep = elapsed_min - elapsed;
     163            4 :         if (sleep <= 0)
     164            0 :             break;
     165              : 
     166            4 :         ResetLatch(MyLatch);
     167              : 
     168              :         /* We're eating a potentially set latch, so check for interrupts */
     169            4 :         CHECK_FOR_INTERRUPTS();
     170              : 
     171              :         /*
     172              :          * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
     173              :          * the maximum time to sleep. Thus the cast to long is safe.
     174              :          */
     175            4 :         wait_result = WaitLatch(MyLatch,
     176              :                                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     177            4 :                                 (long) (sleep / 1000),
     178              :                                 WAIT_EVENT_BASE_BACKUP_THROTTLE);
     179              : 
     180            4 :         if (wait_result & WL_LATCH_SET)
     181            0 :             CHECK_FOR_INTERRUPTS();
     182              : 
     183              :         /* Done waiting? */
     184            4 :         if (wait_result & WL_TIMEOUT)
     185            4 :             break;
     186              :     }
     187              : 
     188              :     /*
     189              :      * As we work with integers, only whole multiple of throttling_sample was
     190              :      * processed. The rest will be done during the next call of this function.
     191              :      */
     192            4 :     sink->throttling_counter %= sink->throttling_sample;
     193              : 
     194              :     /*
     195              :      * Time interval for the remaining amount and possible next increments
     196              :      * starts now.
     197              :      */
     198            4 :     sink->throttled_last = GetCurrentTimestamp();
     199              : }
        

Generated by: LCOV version 2.0-1