LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_throttle.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.3 % 42 35
Test Date: 2026-03-13 11:14:55 Functions: 80.0 % 5 4
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * basebackup_throttle.c
       4              :  *    Basebackup sink implementing throttling. Data is forwarded to the
       5              :  *    next base backup sink in the chain at a rate no greater than the
       6              :  *    configured maximum.
       7              :  *
       8              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/backup/basebackup_throttle.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : #include "postgres.h"
      16              : 
      17              : #include "backup/basebackup_sink.h"
      18              : #include "miscadmin.h"
      19              : #include "pgstat.h"
      20              : #include "storage/latch.h"
      21              : #include "utils/timestamp.h"
      22              : #include "utils/wait_event.h"
      23              : 
      24              : typedef struct bbsink_throttle
      25              : {
      26              :     /* Common information for all types of sink. */
      27              :     bbsink      base;
      28              : 
      29              :     /* The actual number of bytes, transfer of which may cause sleep. */
      30              :     uint64      throttling_sample;
      31              : 
      32              :     /* Amount of data already transferred but not yet throttled.  */
      33              :     int64       throttling_counter;
      34              : 
      35              :     /* The minimum time required to transfer throttling_sample bytes. */
      36              :     TimeOffset  elapsed_min_unit;
      37              : 
      38              :     /* The last check of the transfer rate. */
      39              :     TimestampTz throttled_last;
      40              : } bbsink_throttle;
      41              : 
      42              : static void bbsink_throttle_begin_backup(bbsink *sink);
      43              : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
      44              : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
      45              : static void throttle(bbsink_throttle *sink, size_t increment);
      46              : 
      47              : static const bbsink_ops bbsink_throttle_ops = {
      48              :     .begin_backup = bbsink_throttle_begin_backup,
      49              :     .begin_archive = bbsink_forward_begin_archive,
      50              :     .archive_contents = bbsink_throttle_archive_contents,
      51              :     .end_archive = bbsink_forward_end_archive,
      52              :     .begin_manifest = bbsink_forward_begin_manifest,
      53              :     .manifest_contents = bbsink_throttle_manifest_contents,
      54              :     .end_manifest = bbsink_forward_end_manifest,
      55              :     .end_backup = bbsink_forward_end_backup,
      56              :     .cleanup = bbsink_forward_cleanup
      57              : };
      58              : 
      59              : /*
      60              :  * How frequently to throttle, as a fraction of the specified rate-second.
      61              :  */
      62              : #define THROTTLING_FREQUENCY    8
      63              : 
      64              : /*
      65              :  * Create a new basebackup sink that performs throttling and forwards data
      66              :  * to a successor sink.
      67              :  */
      68              : bbsink *
      69            1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
      70              : {
      71              :     bbsink_throttle *sink;
      72              : 
      73              :     Assert(next != NULL);
      74              :     Assert(maxrate > 0);
      75              : 
      76            1 :     sink = palloc0_object(bbsink_throttle);
      77            1 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
      78            1 :     sink->base.bbs_next = next;
      79              : 
      80            1 :     sink->throttling_sample =
      81            1 :         (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
      82              : 
      83              :     /*
      84              :      * The minimum amount of time for throttling_sample bytes to be
      85              :      * transferred.
      86              :      */
      87            1 :     sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
      88              : 
      89            1 :     return &sink->base;
      90              : }
      91              : 
      92              : /*
      93              :  * There's no real work to do here, but we need to record the current time so
      94              :  * that it can be used for future calculations.
      95              :  */
      96              : static void
      97            1 : bbsink_throttle_begin_backup(bbsink *sink)
      98              : {
      99            1 :     bbsink_throttle *mysink = (bbsink_throttle *) sink;
     100              : 
     101            1 :     bbsink_forward_begin_backup(sink);
     102              : 
     103              :     /* The 'real data' starts now (header was ignored). */
     104            1 :     mysink->throttled_last = GetCurrentTimestamp();
     105            1 : }
     106              : 
     107              : /*
     108              :  * First throttle, and then pass archive contents to next sink.
     109              :  */
     110              : static void
     111           21 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
     112              : {
     113           21 :     throttle((bbsink_throttle *) sink, len);
     114              : 
     115           21 :     bbsink_forward_archive_contents(sink, len);
     116           21 : }
     117              : 
     118              : /*
     119              :  * First throttle, and then pass manifest contents to next sink.
     120              :  */
     121              : static void
     122            0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
     123              : {
     124            0 :     throttle((bbsink_throttle *) sink, len);
     125              : 
     126            0 :     bbsink_forward_manifest_contents(sink, len);
     127            0 : }
     128              : 
     129              : /*
     130              :  * Increment the network transfer counter by the given number of bytes,
     131              :  * and sleep if necessary to comply with the requested network transfer
     132              :  * rate.
     133              :  */
     134              : static void
     135           21 : throttle(bbsink_throttle *sink, size_t increment)
     136              : {
     137              :     TimeOffset  elapsed_min;
     138              : 
     139              :     Assert(sink->throttling_counter >= 0);
     140              : 
     141           21 :     sink->throttling_counter += increment;
     142           21 :     if (sink->throttling_counter < sink->throttling_sample)
     143           17 :         return;
     144              : 
     145              :     /* How much time should have elapsed at minimum? */
     146            4 :     elapsed_min = sink->elapsed_min_unit *
     147            4 :         (sink->throttling_counter / sink->throttling_sample);
     148              : 
     149              :     /*
     150              :      * Since the latch could be set repeatedly because of concurrently WAL
     151              :      * activity, sleep in a loop to ensure enough time has passed.
     152              :      */
     153              :     for (;;)
     154            0 :     {
     155              :         TimeOffset  elapsed,
     156              :                     sleep;
     157              :         int         wait_result;
     158              : 
     159              :         /* Time elapsed since the last measurement (and possible wake up). */
     160            4 :         elapsed = GetCurrentTimestamp() - sink->throttled_last;
     161              : 
     162              :         /* sleep if the transfer is faster than it should be */
     163            4 :         sleep = elapsed_min - elapsed;
     164            4 :         if (sleep <= 0)
     165            0 :             break;
     166              : 
     167            4 :         ResetLatch(MyLatch);
     168              : 
     169              :         /* We're eating a potentially set latch, so check for interrupts */
     170            4 :         CHECK_FOR_INTERRUPTS();
     171              : 
     172              :         /*
     173              :          * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
     174              :          * the maximum time to sleep. Thus the cast to long is safe.
     175              :          */
     176            4 :         wait_result = WaitLatch(MyLatch,
     177              :                                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     178            4 :                                 (long) (sleep / 1000),
     179              :                                 WAIT_EVENT_BASE_BACKUP_THROTTLE);
     180              : 
     181            4 :         if (wait_result & WL_LATCH_SET)
     182            0 :             CHECK_FOR_INTERRUPTS();
     183              : 
     184              :         /* Done waiting? */
     185            4 :         if (wait_result & WL_TIMEOUT)
     186            4 :             break;
     187              :     }
     188              : 
     189              :     /*
     190              :      * As we work with integers, only whole multiple of throttling_sample was
     191              :      * processed. The rest will be done during the next call of this function.
     192              :      */
     193            4 :     sink->throttling_counter %= sink->throttling_sample;
     194              : 
     195              :     /*
     196              :      * Time interval for the remaining amount and possible next increments
     197              :      * starts now.
     198              :      */
     199            4 :     sink->throttled_last = GetCurrentTimestamp();
     200              : }
        

Generated by: LCOV version 2.0-1