LCOV - code coverage report
Current view: top level - src/test/modules/test_aio - test_aio.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 294 315 93.3 %
Date: 2025-04-24 12:15:10 Functions: 45 45 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_aio.c
       4             :  *      Helpers to write tests for AIO
       5             :  *
       6             :  * This module provides interface functions for C functionality to SQL, to
       7             :  * make it possible to test AIO related behavior in a targeted way from SQL.
       8             :  * It'd not generally be safe to export these functions to SQL, but for a test
       9             :  * that's fine.
      10             :  *
      11             :  * Copyright (c) 2020-2025, PostgreSQL Global Development Group
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *    src/test/modules/test_aio/test_aio.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/relation.h"
      22             : #include "fmgr.h"
      23             : #include "storage/aio.h"
      24             : #include "storage/aio_internal.h"
      25             : #include "storage/buf_internals.h"
      26             : #include "storage/bufmgr.h"
      27             : #include "storage/checksum.h"
      28             : #include "storage/ipc.h"
      29             : #include "storage/lwlock.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/injection_point.h"
      32             : #include "utils/rel.h"
      33             : 
      34             : 
      35           4 : PG_MODULE_MAGIC;
      36             : 
      37             : 
      38             : typedef struct InjIoErrorState
      39             : {
      40             :     bool        enabled_short_read;
      41             :     bool        enabled_reopen;
      42             : 
      43             :     bool        short_read_result_set;
      44             :     int         short_read_result;
      45             : }           InjIoErrorState;
      46             : 
      47             : static InjIoErrorState * inj_io_error_state;
      48             : 
      49             : /* Shared memory init callbacks */
      50             : static shmem_request_hook_type prev_shmem_request_hook = NULL;
      51             : static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
      52             : 
      53             : 
      54             : static PgAioHandle *last_handle;
      55             : 
      56             : 
      57             : 
      58             : static void
      59           4 : test_aio_shmem_request(void)
      60             : {
      61           4 :     if (prev_shmem_request_hook)
      62           0 :         prev_shmem_request_hook();
      63             : 
      64           4 :     RequestAddinShmemSpace(sizeof(InjIoErrorState));
      65           4 : }
      66             : 
      67             : static void
      68           4 : test_aio_shmem_startup(void)
      69             : {
      70             :     bool        found;
      71             : 
      72           4 :     if (prev_shmem_startup_hook)
      73           0 :         prev_shmem_startup_hook();
      74             : 
      75             :     /* Create or attach to the shared memory state */
      76           4 :     LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
      77             : 
      78           4 :     inj_io_error_state = ShmemInitStruct("injection_points",
      79             :                                          sizeof(InjIoErrorState),
      80             :                                          &found);
      81             : 
      82           4 :     if (!found)
      83             :     {
      84             :         /* First time through, initialize */
      85           4 :         inj_io_error_state->enabled_short_read = false;
      86           4 :         inj_io_error_state->enabled_reopen = false;
      87             : 
      88             : #ifdef USE_INJECTION_POINTS
      89           4 :         InjectionPointAttach("aio-process-completion-before-shared",
      90             :                              "test_aio",
      91             :                              "inj_io_short_read",
      92             :                              NULL,
      93             :                              0);
      94           4 :         InjectionPointLoad("aio-process-completion-before-shared");
      95             : 
      96           4 :         InjectionPointAttach("aio-worker-after-reopen",
      97             :                              "test_aio",
      98             :                              "inj_io_reopen",
      99             :                              NULL,
     100             :                              0);
     101           4 :         InjectionPointLoad("aio-worker-after-reopen");
     102             : 
     103             : #endif
     104             :     }
     105             :     else
     106             :     {
     107             :         /*
     108             :          * Pre-load the injection points now, so we can call them in a
     109             :          * critical section.
     110             :          */
     111             : #ifdef USE_INJECTION_POINTS
     112           0 :         InjectionPointLoad("aio-process-completion-before-shared");
     113           0 :         InjectionPointLoad("aio-worker-after-reopen");
     114           0 :         elog(LOG, "injection point loaded");
     115             : #endif
     116             :     }
     117             : 
     118           4 :     LWLockRelease(AddinShmemInitLock);
     119           4 : }
     120             : 
     121             : void
     122           4 : _PG_init(void)
     123             : {
     124           4 :     if (!process_shared_preload_libraries_in_progress)
     125           0 :         return;
     126             : 
     127           4 :     prev_shmem_request_hook = shmem_request_hook;
     128           4 :     shmem_request_hook = test_aio_shmem_request;
     129           4 :     prev_shmem_startup_hook = shmem_startup_hook;
     130           4 :     shmem_startup_hook = test_aio_shmem_startup;
     131             : }
     132             : 
     133             : 
     134          12 : PG_FUNCTION_INFO_V1(errno_from_string);
     135             : Datum
     136           8 : errno_from_string(PG_FUNCTION_ARGS)
     137             : {
     138           8 :     const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
     139             : 
     140           8 :     if (strcmp(sym, "EIO") == 0)
     141           4 :         PG_RETURN_INT32(EIO);
     142           4 :     else if (strcmp(sym, "EAGAIN") == 0)
     143           0 :         PG_RETURN_INT32(EAGAIN);
     144           4 :     else if (strcmp(sym, "EINTR") == 0)
     145           0 :         PG_RETURN_INT32(EINTR);
     146           4 :     else if (strcmp(sym, "ENOSPC") == 0)
     147           0 :         PG_RETURN_INT32(ENOSPC);
     148           4 :     else if (strcmp(sym, "EROFS") == 0)
     149           4 :         PG_RETURN_INT32(EROFS);
     150             : 
     151           0 :     ereport(ERROR,
     152             :             errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     153             :             errmsg_internal("%s is not a supported errno value", sym));
     154             :     PG_RETURN_INT32(0);
     155             : }
     156             : 
     157          16 : PG_FUNCTION_INFO_V1(grow_rel);
     158             : Datum
     159          12 : grow_rel(PG_FUNCTION_ARGS)
     160             : {
     161          12 :     Oid         relid = PG_GETARG_OID(0);
     162          12 :     uint32      nblocks = PG_GETARG_UINT32(1);
     163             :     Relation    rel;
     164             : #define MAX_BUFFERS_TO_EXTEND_BY 64
     165             :     Buffer      victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
     166             : 
     167          12 :     rel = relation_open(relid, AccessExclusiveLock);
     168             : 
     169          24 :     while (nblocks > 0)
     170             :     {
     171             :         uint32      extend_by_pages;
     172             : 
     173          12 :         extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
     174             : 
     175          12 :         ExtendBufferedRelBy(BMR_REL(rel),
     176             :                             MAIN_FORKNUM,
     177             :                             NULL,
     178             :                             0,
     179             :                             extend_by_pages,
     180             :                             victim_buffers,
     181             :                             &extend_by_pages);
     182             : 
     183          12 :         nblocks -= extend_by_pages;
     184             : 
     185         156 :         for (uint32 i = 0; i < extend_by_pages; i++)
     186             :         {
     187         144 :             ReleaseBuffer(victim_buffers[i]);
     188             :         }
     189             :     }
     190             : 
     191          12 :     relation_close(rel, NoLock);
     192             : 
     193          12 :     PG_RETURN_VOID();
     194             : }
     195             : 
     196          36 : PG_FUNCTION_INFO_V1(modify_rel_block);
     197             : Datum
     198         140 : modify_rel_block(PG_FUNCTION_ARGS)
     199             : {
     200         140 :     Oid         relid = PG_GETARG_OID(0);
     201         140 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     202         140 :     bool        zero = PG_GETARG_BOOL(2);
     203         140 :     bool        corrupt_header = PG_GETARG_BOOL(3);
     204         140 :     bool        corrupt_checksum = PG_GETARG_BOOL(4);
     205         140 :     Page        page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     206             :     bool        flushed;
     207             :     Relation    rel;
     208             :     Buffer      buf;
     209             :     PageHeader  ph;
     210             : 
     211         140 :     rel = relation_open(relid, AccessExclusiveLock);
     212             : 
     213         140 :     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
     214             :                              RBM_ZERO_ON_ERROR, NULL);
     215             : 
     216         140 :     LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     217             : 
     218             :     /*
     219             :      * copy the page to local memory, seems nicer than to directly modify in
     220             :      * the buffer pool.
     221             :      */
     222         140 :     memcpy(page, BufferGetPage(buf), BLCKSZ);
     223             : 
     224         140 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     225             : 
     226         140 :     ReleaseBuffer(buf);
     227             : 
     228             :     /*
     229             :      * Don't want to have a buffer in-memory that's marked valid where the
     230             :      * on-disk contents are invalid. Particularly not if the in-memory buffer
     231             :      * could be dirty...
     232             :      *
     233             :      * While we hold an AEL on the relation nobody else should be able to read
     234             :      * the buffer in.
     235             :      *
     236             :      * NB: This is probably racy, better don't copy this to non-test code.
     237             :      */
     238         140 :     if (BufferIsLocal(buf))
     239          36 :         InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
     240             :     else
     241         104 :         EvictUnpinnedBuffer(buf, &flushed);
     242             : 
     243             :     /*
     244             :      * Now modify the page as asked for by the caller.
     245             :      */
     246         140 :     if (zero)
     247          32 :         memset(page, 0, BufferGetPageSize(buf));
     248             : 
     249         140 :     if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
     250          32 :         PageInit(page, BufferGetPageSize(buf), 0);
     251             : 
     252         140 :     ph = (PageHeader) page;
     253             : 
     254         140 :     if (corrupt_header)
     255          64 :         ph->pd_special = BLCKSZ + 1;
     256             : 
     257         140 :     if (corrupt_checksum)
     258             :     {
     259          52 :         bool        successfully_corrupted = 0;
     260             : 
     261             :         /*
     262             :          * Any single modification of the checksum could just end up being
     263             :          * valid again, due to e.g. corrupt_header changing the data in a way
     264             :          * that'd result in the "corrupted" checksum, or the checksum already
     265             :          * being invalid. Retry in that, unlikely, case.
     266             :          */
     267          52 :         for (int i = 0; i < 100; i++)
     268             :         {
     269             :             uint16      verify_checksum;
     270             :             uint16      old_checksum;
     271             : 
     272          52 :             old_checksum = ph->pd_checksum;
     273          52 :             ph->pd_checksum = old_checksum + 1;
     274             : 
     275          52 :             elog(LOG, "corrupting checksum of blk %u from %u to %u",
     276             :                  blkno, old_checksum, ph->pd_checksum);
     277             : 
     278          52 :             verify_checksum = pg_checksum_page(page, blkno);
     279          52 :             if (verify_checksum != ph->pd_checksum)
     280             :             {
     281          52 :                 successfully_corrupted = true;
     282          52 :                 break;
     283             :             }
     284             :         }
     285             : 
     286          52 :         if (!successfully_corrupted)
     287           0 :             elog(ERROR, "could not corrupt checksum, what's going on?");
     288             :     }
     289             :     else
     290             :     {
     291          88 :         PageSetChecksumInplace(page, blkno);
     292             :     }
     293             : 
     294         140 :     smgrwrite(RelationGetSmgr(rel),
     295             :               MAIN_FORKNUM, blkno, page, true);
     296             : 
     297         140 :     relation_close(rel, NoLock);
     298             : 
     299         140 :     PG_RETURN_VOID();
     300             : }
     301             : 
     302             : /*
     303             :  * Ensures a buffer for rel & blkno is in shared buffers, without actually
     304             :  * caring about the buffer contents. Used to set up test scenarios.
     305             :  */
     306             : static Buffer
     307         260 : create_toy_buffer(Relation rel, BlockNumber blkno)
     308             : {
     309             :     Buffer      buf;
     310             :     BufferDesc *buf_hdr;
     311             :     uint32      buf_state;
     312         260 :     bool        was_pinned = false;
     313             : 
     314             :     /* place buffer in shared buffers without erroring out */
     315         260 :     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
     316         260 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     317             : 
     318         260 :     if (RelationUsesLocalBuffers(rel))
     319             :     {
     320          88 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     321          88 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     322             :     }
     323             :     else
     324             :     {
     325         172 :         buf_hdr = GetBufferDescriptor(buf - 1);
     326         172 :         buf_state = LockBufHdr(buf_hdr);
     327             :     }
     328             : 
     329             :     /*
     330             :      * We should be the only backend accessing this buffer. This is just a
     331             :      * small bit of belt-and-suspenders defense, none of this code should ever
     332             :      * run in a cluster with real data.
     333             :      */
     334         260 :     if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
     335           0 :         was_pinned = true;
     336             :     else
     337         260 :         buf_state &= ~(BM_VALID | BM_DIRTY);
     338             : 
     339         260 :     if (RelationUsesLocalBuffers(rel))
     340          88 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     341             :     else
     342         172 :         UnlockBufHdr(buf_hdr, buf_state);
     343             : 
     344         260 :     if (was_pinned)
     345           0 :         elog(ERROR, "toy buffer %d was already pinned",
     346             :              buf);
     347             : 
     348         260 :     return buf;
     349             : }
     350             : 
     351             : /*
     352             :  * A "low level" read. This does similar things to what
     353             :  * StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
     354             :  * less sanity).
     355             :  */
     356          40 : PG_FUNCTION_INFO_V1(read_rel_block_ll);
     357             : Datum
     358         144 : read_rel_block_ll(PG_FUNCTION_ARGS)
     359             : {
     360         144 :     Oid         relid = PG_GETARG_OID(0);
     361         144 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     362         144 :     int         nblocks = PG_GETARG_INT32(2);
     363         144 :     bool        wait_complete = PG_GETARG_BOOL(3);
     364         144 :     bool        batchmode_enter = PG_GETARG_BOOL(4);
     365         144 :     bool        call_smgrreleaseall = PG_GETARG_BOOL(5);
     366         144 :     bool        batchmode_exit = PG_GETARG_BOOL(6);
     367         144 :     bool        zero_on_error = PG_GETARG_BOOL(7);
     368             :     Relation    rel;
     369             :     Buffer      bufs[PG_IOV_MAX];
     370             :     BufferDesc *buf_hdrs[PG_IOV_MAX];
     371             :     Page        pages[PG_IOV_MAX];
     372         144 :     uint8       srb_flags = 0;
     373             :     PgAioReturn ior;
     374             :     PgAioHandle *ioh;
     375             :     PgAioWaitRef iow;
     376             :     SMgrRelation smgr;
     377             : 
     378         144 :     if (nblocks <= 0 || nblocks > PG_IOV_MAX)
     379           0 :         elog(ERROR, "nblocks is out of range");
     380             : 
     381         144 :     rel = relation_open(relid, AccessExclusiveLock);
     382             : 
     383         392 :     for (int i = 0; i < nblocks; i++)
     384             :     {
     385         248 :         bufs[i] = create_toy_buffer(rel, blkno + i);
     386         248 :         pages[i] = BufferGetBlock(bufs[i]);
     387         248 :         buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
     388         248 :             GetLocalBufferDescriptor(-bufs[i] - 1) :
     389         164 :             GetBufferDescriptor(bufs[i] - 1);
     390             :     }
     391             : 
     392         144 :     smgr = RelationGetSmgr(rel);
     393             : 
     394         144 :     pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
     395             : 
     396         144 :     ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
     397         144 :     pgaio_io_get_wref(ioh, &iow);
     398             : 
     399         144 :     if (RelationUsesLocalBuffers(rel))
     400             :     {
     401         128 :         for (int i = 0; i < nblocks; i++)
     402          84 :             StartLocalBufferIO(buf_hdrs[i], true, false);
     403          44 :         pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
     404             :     }
     405             :     else
     406             :     {
     407         264 :         for (int i = 0; i < nblocks; i++)
     408         164 :             StartBufferIO(buf_hdrs[i], true, false);
     409             :     }
     410             : 
     411         144 :     pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
     412             : 
     413         144 :     if (zero_on_error | zero_damaged_pages)
     414          44 :         srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
     415         144 :     if (ignore_checksum_failure)
     416          20 :         srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
     417             : 
     418         144 :     pgaio_io_register_callbacks(ioh,
     419         144 :                                 RelationUsesLocalBuffers(rel) ?
     420             :                                 PGAIO_HCB_LOCAL_BUFFER_READV :
     421             :                                 PGAIO_HCB_SHARED_BUFFER_READV,
     422             :                                 srb_flags);
     423             : 
     424         144 :     if (batchmode_enter)
     425           8 :         pgaio_enter_batchmode();
     426             : 
     427         144 :     smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
     428             :                    (void *) pages, nblocks);
     429             : 
     430         144 :     if (call_smgrreleaseall)
     431           8 :         smgrreleaseall();
     432             : 
     433         144 :     if (batchmode_exit)
     434           8 :         pgaio_exit_batchmode();
     435             : 
     436         392 :     for (int i = 0; i < nblocks; i++)
     437         248 :         ReleaseBuffer(bufs[i]);
     438             : 
     439         144 :     if (wait_complete)
     440             :     {
     441         100 :         pgaio_wref_wait(&iow);
     442             : 
     443         100 :         if (ior.result.status != PGAIO_RS_OK)
     444          88 :             pgaio_result_report(ior.result,
     445             :                                 &ior.target_data,
     446          88 :                                 ior.result.status == PGAIO_RS_ERROR ?
     447             :                                 ERROR : WARNING);
     448             :     }
     449             : 
     450         100 :     relation_close(rel, NoLock);
     451             : 
     452         100 :     PG_RETURN_VOID();
     453             : }
     454             : 
     455          22 : PG_FUNCTION_INFO_V1(invalidate_rel_block);
     456             : Datum
     457         278 : invalidate_rel_block(PG_FUNCTION_ARGS)
     458             : {
     459         278 :     Oid         relid = PG_GETARG_OID(0);
     460         278 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     461             :     Relation    rel;
     462             :     PrefetchBufferResult pr;
     463             :     Buffer      buf;
     464             : 
     465         278 :     rel = relation_open(relid, AccessExclusiveLock);
     466             : 
     467             :     /*
     468             :      * This is a gross hack, but there's no other API exposed that allows to
     469             :      * get a buffer ID without actually reading the block in.
     470             :      */
     471         278 :     pr = PrefetchBuffer(rel, MAIN_FORKNUM, blkno);
     472         278 :     buf = pr.recent_buffer;
     473             : 
     474         278 :     if (BufferIsValid(buf))
     475             :     {
     476             :         /* if the buffer contents aren't valid, this'll return false */
     477         250 :         if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
     478             :         {
     479         238 :             BufferDesc *buf_hdr = BufferIsLocal(buf) ?
     480          64 :                 GetLocalBufferDescriptor(-buf - 1)
     481         238 :                 : GetBufferDescriptor(buf - 1);
     482             :             bool        flushed;
     483             : 
     484         238 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     485             : 
     486         238 :             if (pg_atomic_read_u32(&buf_hdr->state) & BM_DIRTY)
     487             :             {
     488         144 :                 if (BufferIsLocal(buf))
     489          44 :                     FlushLocalBuffer(buf_hdr, NULL);
     490             :                 else
     491         100 :                     FlushOneBuffer(buf);
     492             :             }
     493         238 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     494         238 :             ReleaseBuffer(buf);
     495             : 
     496         238 :             if (BufferIsLocal(buf))
     497          64 :                 InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
     498         174 :             else if (!EvictUnpinnedBuffer(buf, &flushed))
     499           0 :                 elog(ERROR, "couldn't evict");
     500             :         }
     501             :     }
     502             : 
     503         278 :     relation_close(rel, AccessExclusiveLock);
     504             : 
     505         278 :     PG_RETURN_VOID();
     506             : }
     507             : 
     508          12 : PG_FUNCTION_INFO_V1(buffer_create_toy);
     509             : Datum
     510          12 : buffer_create_toy(PG_FUNCTION_ARGS)
     511             : {
     512          12 :     Oid         relid = PG_GETARG_OID(0);
     513          12 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     514             :     Relation    rel;
     515             :     Buffer      buf;
     516             : 
     517          12 :     rel = relation_open(relid, AccessExclusiveLock);
     518             : 
     519          12 :     buf = create_toy_buffer(rel, blkno);
     520          12 :     ReleaseBuffer(buf);
     521             : 
     522          12 :     relation_close(rel, NoLock);
     523             : 
     524          12 :     PG_RETURN_INT32(buf);
     525             : }
     526             : 
     527          16 : PG_FUNCTION_INFO_V1(buffer_call_start_io);
     528             : Datum
     529          40 : buffer_call_start_io(PG_FUNCTION_ARGS)
     530             : {
     531          40 :     Buffer      buf = PG_GETARG_INT32(0);
     532          40 :     bool        for_input = PG_GETARG_BOOL(1);
     533          40 :     bool        nowait = PG_GETARG_BOOL(2);
     534             :     bool        can_start;
     535             : 
     536          40 :     if (BufferIsLocal(buf))
     537          16 :         can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
     538             :                                        for_input, nowait);
     539             :     else
     540          24 :         can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
     541             :                                   for_input, nowait);
     542             : 
     543             :     /*
     544             :      * For tests we don't want the resowner release preventing us from
     545             :      * orchestrating odd scenarios.
     546             :      */
     547          40 :     if (can_start && !BufferIsLocal(buf))
     548          12 :         ResourceOwnerForgetBufferIO(CurrentResourceOwner,
     549             :                                     buf);
     550             : 
     551          40 :     ereport(LOG,
     552             :             errmsg("buffer %d after StartBufferIO: %s",
     553             :                    buf, DebugPrintBufferRefcount(buf)),
     554             :             errhidestmt(true), errhidecontext(true));
     555             : 
     556          40 :     PG_RETURN_BOOL(can_start);
     557             : }
     558             : 
     559          16 : PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
     560             : Datum
     561          20 : buffer_call_terminate_io(PG_FUNCTION_ARGS)
     562             : {
     563          20 :     Buffer      buf = PG_GETARG_INT32(0);
     564          20 :     bool        for_input = PG_GETARG_BOOL(1);
     565          20 :     bool        succeed = PG_GETARG_BOOL(2);
     566          20 :     bool        io_error = PG_GETARG_BOOL(3);
     567          20 :     bool        release_aio = PG_GETARG_BOOL(4);
     568          20 :     bool        clear_dirty = false;
     569          20 :     uint32      set_flag_bits = 0;
     570             : 
     571          20 :     if (io_error)
     572           0 :         set_flag_bits |= BM_IO_ERROR;
     573             : 
     574          20 :     if (for_input)
     575             :     {
     576          20 :         clear_dirty = false;
     577             : 
     578          20 :         if (succeed)
     579           8 :             set_flag_bits |= BM_VALID;
     580             :     }
     581             :     else
     582             :     {
     583           0 :         if (succeed)
     584           0 :             clear_dirty = true;
     585             :     }
     586             : 
     587          20 :     ereport(LOG,
     588             :             errmsg("buffer %d before Terminate[Local]BufferIO: %s",
     589             :                    buf, DebugPrintBufferRefcount(buf)),
     590             :             errhidestmt(true), errhidecontext(true));
     591             : 
     592          20 :     if (BufferIsLocal(buf))
     593           8 :         TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
     594             :                                clear_dirty, set_flag_bits, release_aio);
     595             :     else
     596          12 :         TerminateBufferIO(GetBufferDescriptor(buf - 1),
     597             :                           clear_dirty, set_flag_bits, false, release_aio);
     598             : 
     599          20 :     ereport(LOG,
     600             :             errmsg("buffer %d after Terminate[Local]BufferIO: %s",
     601             :                    buf, DebugPrintBufferRefcount(buf)),
     602             :             errhidestmt(true), errhidecontext(true));
     603             : 
     604          20 :     PG_RETURN_VOID();
     605             : }
     606             : 
     607          12 : PG_FUNCTION_INFO_V1(handle_get);
     608             : Datum
     609          24 : handle_get(PG_FUNCTION_ARGS)
     610             : {
     611          24 :     last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
     612             : 
     613          24 :     PG_RETURN_VOID();
     614             : }
     615             : 
     616          12 : PG_FUNCTION_INFO_V1(handle_release_last);
     617             : Datum
     618           8 : handle_release_last(PG_FUNCTION_ARGS)
     619             : {
     620           8 :     if (!last_handle)
     621           0 :         elog(ERROR, "no handle");
     622             : 
     623           8 :     pgaio_io_release(last_handle);
     624             : 
     625           4 :     PG_RETURN_VOID();
     626             : }
     627             : 
     628          12 : PG_FUNCTION_INFO_V1(handle_get_and_error);
     629             : Datum
     630          12 : handle_get_and_error(PG_FUNCTION_ARGS)
     631             : {
     632          12 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     633             : 
     634          12 :     elog(ERROR, "as you command");
     635             :     PG_RETURN_VOID();
     636             : }
     637             : 
     638          12 : PG_FUNCTION_INFO_V1(handle_get_twice);
     639             : Datum
     640           4 : handle_get_twice(PG_FUNCTION_ARGS)
     641             : {
     642           4 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     643           4 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     644             : 
     645           0 :     PG_RETURN_VOID();
     646             : }
     647             : 
     648          12 : PG_FUNCTION_INFO_V1(handle_get_release);
     649             : Datum
     650          12 : handle_get_release(PG_FUNCTION_ARGS)
     651             : {
     652             :     PgAioHandle *handle;
     653             : 
     654          12 :     handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
     655          12 :     pgaio_io_release(handle);
     656             : 
     657          12 :     PG_RETURN_VOID();
     658             : }
     659             : 
     660          12 : PG_FUNCTION_INFO_V1(batch_start);
     661             : Datum
     662          12 : batch_start(PG_FUNCTION_ARGS)
     663             : {
     664          12 :     pgaio_enter_batchmode();
     665          12 :     PG_RETURN_VOID();
     666             : }
     667             : 
     668          12 : PG_FUNCTION_INFO_V1(batch_end);
     669             : Datum
     670           4 : batch_end(PG_FUNCTION_ARGS)
     671             : {
     672           4 :     pgaio_exit_batchmode();
     673           4 :     PG_RETURN_VOID();
     674             : }
     675             : 
     676             : #ifdef USE_INJECTION_POINTS
     677             : extern PGDLLEXPORT void inj_io_short_read(const char *name, const void *private_data);
     678             : extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data);
     679             : 
     680             : void
     681        2574 : inj_io_short_read(const char *name, const void *private_data)
     682             : {
     683             :     PgAioHandle *ioh;
     684             : 
     685        2574 :     ereport(LOG,
     686             :             errmsg("short read injection point called, is enabled: %d",
     687             :                    inj_io_error_state->enabled_reopen),
     688             :             errhidestmt(true), errhidecontext(true));
     689             : 
     690        2574 :     if (inj_io_error_state->enabled_short_read)
     691             :     {
     692          96 :         ioh = pgaio_inj_io_get();
     693             : 
     694             :         /*
     695             :          * Only shorten reads that are actually longer than the target size,
     696             :          * otherwise we can trigger over-reads.
     697             :          */
     698          96 :         if (inj_io_error_state->short_read_result_set
     699          96 :             && ioh->op == PGAIO_OP_READV
     700          96 :             && inj_io_error_state->short_read_result <= ioh->result)
     701             :         {
     702          88 :             struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
     703          88 :             int32       old_result = ioh->result;
     704          88 :             int32       new_result = inj_io_error_state->short_read_result;
     705          88 :             int32       processed = 0;
     706             : 
     707          88 :             ereport(LOG,
     708             :                     errmsg("short read inject point, changing result from %d to %d",
     709             :                            old_result, new_result),
     710             :                     errhidestmt(true), errhidecontext(true));
     711             : 
     712             :             /*
     713             :              * The underlying IO actually completed OK, and thus the "invalid"
     714             :              * portion of the IOV actually contains valid data. That can hide
     715             :              * a lot of problems, e.g. if we were to wrongly mark a buffer,
     716             :              * that wasn't read according to the shortened-read, IO as valid,
     717             :              * the contents would look valid and we might miss a bug.
     718             :              *
     719             :              * To avoid that, iterate through the IOV and zero out the
     720             :              * "failed" portion of the IO.
     721             :              */
     722         184 :             for (int i = 0; i < ioh->op_data.read.iov_length; i++)
     723             :             {
     724          96 :                 if (processed + iov[i].iov_len <= new_result)
     725          68 :                     processed += iov[i].iov_len;
     726          28 :                 else if (processed <= new_result)
     727             :                 {
     728          28 :                     uint32      ok_part = new_result - processed;
     729             : 
     730          28 :                     memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
     731          28 :                     processed += iov[i].iov_len;
     732             :                 }
     733             :                 else
     734             :                 {
     735           0 :                     memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
     736             :                 }
     737             :             }
     738             : 
     739          88 :             ioh->result = new_result;
     740             :         }
     741             :     }
     742        2574 : }
     743             : 
     744             : void
     745         692 : inj_io_reopen(const char *name, const void *private_data)
     746             : {
     747         692 :     ereport(LOG,
     748             :             errmsg("reopen injection point called, is enabled: %d",
     749             :                    inj_io_error_state->enabled_reopen),
     750             :             errhidestmt(true), errhidecontext(true));
     751             : 
     752         692 :     if (inj_io_error_state->enabled_reopen)
     753           2 :         elog(ERROR, "injection point triggering failure to reopen ");
     754         690 : }
     755             : #endif
     756             : 
     757          12 : PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
     758             : Datum
     759          28 : inj_io_short_read_attach(PG_FUNCTION_ARGS)
     760             : {
     761             : #ifdef USE_INJECTION_POINTS
     762          28 :     inj_io_error_state->enabled_short_read = true;
     763          28 :     inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
     764          28 :     if (inj_io_error_state->short_read_result_set)
     765          28 :         inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
     766             : #else
     767             :     elog(ERROR, "injection points not supported");
     768             : #endif
     769             : 
     770          28 :     PG_RETURN_VOID();
     771             : }
     772             : 
     773          12 : PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
     774             : Datum
     775           8 : inj_io_short_read_detach(PG_FUNCTION_ARGS)
     776             : {
     777             : #ifdef USE_INJECTION_POINTS
     778           8 :     inj_io_error_state->enabled_short_read = false;
     779             : #else
     780             :     elog(ERROR, "injection points not supported");
     781             : #endif
     782           8 :     PG_RETURN_VOID();
     783             : }
     784             : 
     785          10 : PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
     786             : Datum
     787           2 : inj_io_reopen_attach(PG_FUNCTION_ARGS)
     788             : {
     789             : #ifdef USE_INJECTION_POINTS
     790           2 :     inj_io_error_state->enabled_reopen = true;
     791             : #else
     792             :     elog(ERROR, "injection points not supported");
     793             : #endif
     794             : 
     795           2 :     PG_RETURN_VOID();
     796             : }
     797             : 
     798          10 : PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
     799             : Datum
     800           2 : inj_io_reopen_detach(PG_FUNCTION_ARGS)
     801             : {
     802             : #ifdef USE_INJECTION_POINTS
     803           2 :     inj_io_error_state->enabled_reopen = false;
     804             : #else
     805             :     elog(ERROR, "injection points not supported");
     806             : #endif
     807           2 :     PG_RETURN_VOID();
     808             : }

Generated by: LCOV version 1.14