LCOV - code coverage report
Current view: top level - src/backend/backup - basebackup_throttle.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 36 42 85.7 %
Date: 2025-01-18 04:15:08 Functions: 4 5 80.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * basebackup_throttle.c
       4             :  *    Basebackup sink implementing throttling. Data is forwarded to the
       5             :  *    next base backup sink in the chain at a rate no greater than the
       6             :  *    configured maximum.
       7             :  *
       8             :  * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/backup/basebackup_throttle.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "backup/basebackup_sink.h"
      18             : #include "miscadmin.h"
      19             : #include "pgstat.h"
      20             : #include "storage/latch.h"
      21             : #include "utils/timestamp.h"
      22             : 
      23             : typedef struct bbsink_throttle
      24             : {
      25             :     /* Common information for all types of sink. */
      26             :     bbsink      base;
      27             : 
      28             :     /* The actual number of bytes, transfer of which may cause sleep. */
      29             :     uint64      throttling_sample;
      30             : 
      31             :     /* Amount of data already transferred but not yet throttled.  */
      32             :     int64       throttling_counter;
      33             : 
      34             :     /* The minimum time required to transfer throttling_sample bytes. */
      35             :     TimeOffset  elapsed_min_unit;
      36             : 
      37             :     /* The last check of the transfer rate. */
      38             :     TimestampTz throttled_last;
      39             : } bbsink_throttle;
      40             : 
      41             : static void bbsink_throttle_begin_backup(bbsink *sink);
      42             : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
      43             : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
      44             : static void throttle(bbsink_throttle *sink, size_t increment);
      45             : 
      46             : static const bbsink_ops bbsink_throttle_ops = {
      47             :     .begin_backup = bbsink_throttle_begin_backup,
      48             :     .begin_archive = bbsink_forward_begin_archive,
      49             :     .archive_contents = bbsink_throttle_archive_contents,
      50             :     .end_archive = bbsink_forward_end_archive,
      51             :     .begin_manifest = bbsink_forward_begin_manifest,
      52             :     .manifest_contents = bbsink_throttle_manifest_contents,
      53             :     .end_manifest = bbsink_forward_end_manifest,
      54             :     .end_backup = bbsink_forward_end_backup,
      55             :     .cleanup = bbsink_forward_cleanup
      56             : };
      57             : 
      58             : /*
      59             :  * How frequently to throttle, as a fraction of the specified rate-second.
      60             :  */
      61             : #define THROTTLING_FREQUENCY    8
      62             : 
      63             : /*
      64             :  * Create a new basebackup sink that performs throttling and forwards data
      65             :  * to a successor sink.
      66             :  */
      67             : bbsink *
      68           4 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
      69             : {
      70             :     bbsink_throttle *sink;
      71             : 
      72             :     Assert(next != NULL);
      73             :     Assert(maxrate > 0);
      74             : 
      75           4 :     sink = palloc0(sizeof(bbsink_throttle));
      76           4 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
      77           4 :     sink->base.bbs_next = next;
      78             : 
      79           4 :     sink->throttling_sample =
      80           4 :         (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
      81             : 
      82             :     /*
      83             :      * The minimum amount of time for throttling_sample bytes to be
      84             :      * transferred.
      85             :      */
      86           4 :     sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
      87             : 
      88           4 :     return &sink->base;
      89             : }
      90             : 
      91             : /*
      92             :  * There's no real work to do here, but we need to record the current time so
      93             :  * that it can be used for future calculations.
      94             :  */
      95             : static void
      96           4 : bbsink_throttle_begin_backup(bbsink *sink)
      97             : {
      98           4 :     bbsink_throttle *mysink = (bbsink_throttle *) sink;
      99             : 
     100           4 :     bbsink_forward_begin_backup(sink);
     101             : 
     102             :     /* The 'real data' starts now (header was ignored). */
     103           4 :     mysink->throttled_last = GetCurrentTimestamp();
     104           4 : }
     105             : 
     106             : /*
     107             :  * First throttle, and then pass archive contents to next sink.
     108             :  */
     109             : static void
     110          62 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
     111             : {
     112          62 :     throttle((bbsink_throttle *) sink, len);
     113             : 
     114          60 :     bbsink_forward_archive_contents(sink, len);
     115          60 : }
     116             : 
     117             : /*
     118             :  * First throttle, and then pass manifest contents to next sink.
     119             :  */
     120             : static void
     121           0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
     122             : {
     123           0 :     throttle((bbsink_throttle *) sink, len);
     124             : 
     125           0 :     bbsink_forward_manifest_contents(sink, len);
     126           0 : }
     127             : 
     128             : /*
     129             :  * Increment the network transfer counter by the given number of bytes,
     130             :  * and sleep if necessary to comply with the requested network transfer
     131             :  * rate.
     132             :  */
     133             : static void
     134          62 : throttle(bbsink_throttle *sink, size_t increment)
     135             : {
     136             :     TimeOffset  elapsed_min;
     137             : 
     138             :     Assert(sink->throttling_counter >= 0);
     139             : 
     140          62 :     sink->throttling_counter += increment;
     141          62 :     if (sink->throttling_counter < sink->throttling_sample)
     142          52 :         return;
     143             : 
     144             :     /* How much time should have elapsed at minimum? */
     145          10 :     elapsed_min = sink->elapsed_min_unit *
     146          10 :         (sink->throttling_counter / sink->throttling_sample);
     147             : 
     148             :     /*
     149             :      * Since the latch could be set repeatedly because of concurrently WAL
     150             :      * activity, sleep in a loop to ensure enough time has passed.
     151             :      */
     152             :     for (;;)
     153           0 :     {
     154             :         TimeOffset  elapsed,
     155             :                     sleep;
     156             :         int         wait_result;
     157             : 
     158             :         /* Time elapsed since the last measurement (and possible wake up). */
     159          10 :         elapsed = GetCurrentTimestamp() - sink->throttled_last;
     160             : 
     161             :         /* sleep if the transfer is faster than it should be */
     162          10 :         sleep = elapsed_min - elapsed;
     163          10 :         if (sleep <= 0)
     164           0 :             break;
     165             : 
     166          10 :         ResetLatch(MyLatch);
     167             : 
     168             :         /* We're eating a potentially set latch, so check for interrupts */
     169          10 :         CHECK_FOR_INTERRUPTS();
     170             : 
     171             :         /*
     172             :          * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
     173             :          * the maximum time to sleep. Thus the cast to long is safe.
     174             :          */
     175          10 :         wait_result = WaitLatch(MyLatch,
     176             :                                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     177          10 :                                 (long) (sleep / 1000),
     178             :                                 WAIT_EVENT_BASE_BACKUP_THROTTLE);
     179             : 
     180          10 :         if (wait_result & WL_LATCH_SET)
     181           2 :             CHECK_FOR_INTERRUPTS();
     182             : 
     183             :         /* Done waiting? */
     184           8 :         if (wait_result & WL_TIMEOUT)
     185           8 :             break;
     186             :     }
     187             : 
     188             :     /*
     189             :      * As we work with integers, only whole multiple of throttling_sample was
     190             :      * processed. The rest will be done during the next call of this function.
     191             :      */
     192           8 :     sink->throttling_counter %= sink->throttling_sample;
     193             : 
     194             :     /*
     195             :      * Time interval for the remaining amount and possible next increments
     196             :      * starts now.
     197             :      */
     198           8 :     sink->throttled_last = GetCurrentTimestamp();
     199             : }

Generated by: LCOV version 1.14