LCOV - code coverage report
Current view: top level - src/test/modules/test_aio - test_aio.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 294 315 93.3 %
Date: 2025-04-04 02:15:58 Functions: 45 45 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_aio.c
       4             :  *      Helpers to write tests for AIO
       5             :  *
       6             :  * This module provides interface functions for C functionality to SQL, to
       7             :  * make it possible to test AIO related behavior in a targeted way from SQL.
       8             :  * It'd not generally be safe to export these functions to SQL, but for a test
       9             :  * that's fine.
      10             :  *
      11             :  * Copyright (c) 2020-2025, PostgreSQL Global Development Group
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *    src/test/modules/test_aio/test_aio.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/relation.h"
      22             : #include "fmgr.h"
      23             : #include "storage/aio.h"
      24             : #include "storage/aio_internal.h"
      25             : #include "storage/buf_internals.h"
      26             : #include "storage/bufmgr.h"
      27             : #include "storage/checksum.h"
      28             : #include "storage/ipc.h"
      29             : #include "storage/lwlock.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/injection_point.h"
      32             : #include "utils/rel.h"
      33             : 
      34             : 
      35           4 : PG_MODULE_MAGIC;
      36             : 
      37             : 
      38             : typedef struct InjIoErrorState
      39             : {
      40             :     bool        enabled_short_read;
      41             :     bool        enabled_reopen;
      42             : 
      43             :     bool        short_read_result_set;
      44             :     int         short_read_result;
      45             : }           InjIoErrorState;
      46             : 
      47             : static InjIoErrorState * inj_io_error_state;
      48             : 
      49             : /* Shared memory init callbacks */
      50             : static shmem_request_hook_type prev_shmem_request_hook = NULL;
      51             : static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
      52             : 
      53             : 
      54             : static PgAioHandle *last_handle;
      55             : 
      56             : 
      57             : 
      58             : static void
      59           4 : test_aio_shmem_request(void)
      60             : {
      61           4 :     if (prev_shmem_request_hook)
      62           0 :         prev_shmem_request_hook();
      63             : 
      64           4 :     RequestAddinShmemSpace(sizeof(InjIoErrorState));
      65           4 : }
      66             : 
      67             : static void
      68           4 : test_aio_shmem_startup(void)
      69             : {
      70             :     bool        found;
      71             : 
      72           4 :     if (prev_shmem_startup_hook)
      73           0 :         prev_shmem_startup_hook();
      74             : 
      75             :     /* Create or attach to the shared memory state */
      76           4 :     LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
      77             : 
      78           4 :     inj_io_error_state = ShmemInitStruct("injection_points",
      79             :                                          sizeof(InjIoErrorState),
      80             :                                          &found);
      81             : 
      82           4 :     if (!found)
      83             :     {
      84             :         /* First time through, initialize */
      85           4 :         inj_io_error_state->enabled_short_read = false;
      86           4 :         inj_io_error_state->enabled_reopen = false;
      87             : 
      88             : #ifdef USE_INJECTION_POINTS
      89           4 :         InjectionPointAttach("AIO_PROCESS_COMPLETION_BEFORE_SHARED",
      90             :                              "test_aio",
      91             :                              "inj_io_short_read",
      92             :                              NULL,
      93             :                              0);
      94           4 :         InjectionPointLoad("AIO_PROCESS_COMPLETION_BEFORE_SHARED");
      95             : 
      96           4 :         InjectionPointAttach("AIO_WORKER_AFTER_REOPEN",
      97             :                              "test_aio",
      98             :                              "inj_io_reopen",
      99             :                              NULL,
     100             :                              0);
     101           4 :         InjectionPointLoad("AIO_WORKER_AFTER_REOPEN");
     102             : 
     103             : #endif
     104             :     }
     105             :     else
     106             :     {
     107             :         /*
     108             :          * Pre-load the injection points now, so we can call them in a
     109             :          * critical section.
     110             :          */
     111             : #ifdef USE_INJECTION_POINTS
     112           0 :         InjectionPointLoad("AIO_PROCESS_COMPLETION_BEFORE_SHARED");
     113           0 :         InjectionPointLoad("AIO_WORKER_AFTER_REOPEN");
     114           0 :         elog(LOG, "injection point loaded");
     115             : #endif
     116             :     }
     117             : 
     118           4 :     LWLockRelease(AddinShmemInitLock);
     119           4 : }
     120             : 
     121             : void
     122           4 : _PG_init(void)
     123             : {
     124           4 :     if (!process_shared_preload_libraries_in_progress)
     125           0 :         return;
     126             : 
     127           4 :     prev_shmem_request_hook = shmem_request_hook;
     128           4 :     shmem_request_hook = test_aio_shmem_request;
     129           4 :     prev_shmem_startup_hook = shmem_startup_hook;
     130           4 :     shmem_startup_hook = test_aio_shmem_startup;
     131             : }
     132             : 
     133             : 
     134          12 : PG_FUNCTION_INFO_V1(errno_from_string);
     135             : Datum
     136           8 : errno_from_string(PG_FUNCTION_ARGS)
     137             : {
     138           8 :     const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
     139             : 
     140           8 :     if (strcmp(sym, "EIO") == 0)
     141           4 :         PG_RETURN_INT32(EIO);
     142           4 :     else if (strcmp(sym, "EAGAIN") == 0)
     143           0 :         PG_RETURN_INT32(EAGAIN);
     144           4 :     else if (strcmp(sym, "EINTR") == 0)
     145           0 :         PG_RETURN_INT32(EINTR);
     146           4 :     else if (strcmp(sym, "ENOSPC") == 0)
     147           0 :         PG_RETURN_INT32(ENOSPC);
     148           4 :     else if (strcmp(sym, "EROFS") == 0)
     149           4 :         PG_RETURN_INT32(EROFS);
     150             : 
     151           0 :     ereport(ERROR,
     152             :             errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     153             :             errmsg_internal("%s is not a supported errno value", sym));
     154             :     PG_RETURN_INT32(0);
     155             : }
     156             : 
     157          16 : PG_FUNCTION_INFO_V1(grow_rel);
     158             : Datum
     159          12 : grow_rel(PG_FUNCTION_ARGS)
     160             : {
     161          12 :     Oid         relid = PG_GETARG_OID(0);
     162          12 :     uint32      nblocks = PG_GETARG_UINT32(1);
     163             :     Relation    rel;
     164             : #define MAX_BUFFERS_TO_EXTEND_BY 64
     165             :     Buffer      victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
     166             : 
     167          12 :     rel = relation_open(relid, AccessExclusiveLock);
     168             : 
     169          24 :     while (nblocks > 0)
     170             :     {
     171             :         uint32      extend_by_pages;
     172             : 
     173          12 :         extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
     174             : 
     175          12 :         ExtendBufferedRelBy(BMR_REL(rel),
     176             :                             MAIN_FORKNUM,
     177             :                             NULL,
     178             :                             0,
     179             :                             extend_by_pages,
     180             :                             victim_buffers,
     181             :                             &extend_by_pages);
     182             : 
     183          12 :         nblocks -= extend_by_pages;
     184             : 
     185         156 :         for (uint32 i = 0; i < extend_by_pages; i++)
     186             :         {
     187         144 :             ReleaseBuffer(victim_buffers[i]);
     188             :         }
     189             :     }
     190             : 
     191          12 :     relation_close(rel, NoLock);
     192             : 
     193          12 :     PG_RETURN_VOID();
     194             : }
     195             : 
     196          36 : PG_FUNCTION_INFO_V1(modify_rel_block);
     197             : Datum
     198         140 : modify_rel_block(PG_FUNCTION_ARGS)
     199             : {
     200         140 :     Oid         relid = PG_GETARG_OID(0);
     201         140 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     202         140 :     bool        zero = PG_GETARG_BOOL(2);
     203         140 :     bool        corrupt_header = PG_GETARG_BOOL(3);
     204         140 :     bool        corrupt_checksum = PG_GETARG_BOOL(4);
     205         140 :     Page        page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     206             :     Relation    rel;
     207             :     Buffer      buf;
     208             :     PageHeader  ph;
     209             : 
     210         140 :     rel = relation_open(relid, AccessExclusiveLock);
     211             : 
     212         140 :     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
     213             :                              RBM_ZERO_ON_ERROR, NULL);
     214             : 
     215         140 :     LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     216             : 
     217             :     /*
     218             :      * copy the page to local memory, seems nicer than to directly modify in
     219             :      * the buffer pool.
     220             :      */
     221         140 :     memcpy(page, BufferGetPage(buf), BLCKSZ);
     222             : 
     223         140 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     224             : 
     225         140 :     ReleaseBuffer(buf);
     226             : 
     227             :     /*
     228             :      * Don't want to have a buffer in-memory that's marked valid where the
     229             :      * on-disk contents are invalid. Particularly not if the in-memory buffer
     230             :      * could be dirty...
     231             :      *
     232             :      * While we hold an AEL on the relation nobody else should be able to read
     233             :      * the buffer in.
     234             :      *
     235             :      * NB: This is probably racy, better don't copy this to non-test code.
     236             :      */
     237         140 :     if (BufferIsLocal(buf))
     238          36 :         InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
     239             :     else
     240         104 :         EvictUnpinnedBuffer(buf);
     241             : 
     242             :     /*
     243             :      * Now modify the page as asked for by the caller.
     244             :      */
     245         140 :     if (zero)
     246          32 :         memset(page, 0, BufferGetPageSize(buf));
     247             : 
     248         140 :     if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
     249          32 :         PageInit(page, BufferGetPageSize(buf), 0);
     250             : 
     251         140 :     ph = (PageHeader) page;
     252             : 
     253         140 :     if (corrupt_header)
     254          64 :         ph->pd_special = BLCKSZ + 1;
     255             : 
     256         140 :     if (corrupt_checksum)
     257             :     {
     258          52 :         bool        successfully_corrupted = 0;
     259             : 
     260             :         /*
     261             :          * Any single modification of the checksum could just end up being
     262             :          * valid again, due to e.g. corrupt_header changing the data in a way
     263             :          * that'd result in the "corrupted" checksum, or the checksum already
     264             :          * being invalid. Retry in that, unlikely, case.
     265             :          */
     266          52 :         for (int i = 0; i < 100; i++)
     267             :         {
     268             :             uint16      verify_checksum;
     269             :             uint16      old_checksum;
     270             : 
     271          52 :             old_checksum = ph->pd_checksum;
     272          52 :             ph->pd_checksum = old_checksum + 1;
     273             : 
     274          52 :             elog(LOG, "corrupting checksum of blk %u from %u to %u",
     275             :                  blkno, old_checksum, ph->pd_checksum);
     276             : 
     277          52 :             verify_checksum = pg_checksum_page(page, blkno);
     278          52 :             if (verify_checksum != ph->pd_checksum)
     279             :             {
     280          52 :                 successfully_corrupted = true;
     281          52 :                 break;
     282             :             }
     283             :         }
     284             : 
     285          52 :         if (!successfully_corrupted)
     286           0 :             elog(ERROR, "could not corrupt checksum, what's going on?");
     287             :     }
     288             :     else
     289             :     {
     290          88 :         PageSetChecksumInplace(page, blkno);
     291             :     }
     292             : 
     293         140 :     smgrwrite(RelationGetSmgr(rel),
     294             :               MAIN_FORKNUM, blkno, page, true);
     295             : 
     296         140 :     relation_close(rel, NoLock);
     297             : 
     298         140 :     PG_RETURN_VOID();
     299             : }
     300             : 
     301             : /*
     302             :  * Ensures a buffer for rel & blkno is in shared buffers, without actually
     303             :  * caring about the buffer contents. Used to set up test scenarios.
     304             :  */
     305             : static Buffer
     306         260 : create_toy_buffer(Relation rel, BlockNumber blkno)
     307             : {
     308             :     Buffer      buf;
     309             :     BufferDesc *buf_hdr;
     310             :     uint32      buf_state;
     311         260 :     bool        was_pinned = false;
     312             : 
     313             :     /* place buffer in shared buffers without erroring out */
     314         260 :     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
     315         260 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     316             : 
     317         260 :     if (RelationUsesLocalBuffers(rel))
     318             :     {
     319          88 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     320          88 :         buf_state = pg_atomic_read_u32(&buf_hdr->state);
     321             :     }
     322             :     else
     323             :     {
     324         172 :         buf_hdr = GetBufferDescriptor(buf - 1);
     325         172 :         buf_state = LockBufHdr(buf_hdr);
     326             :     }
     327             : 
     328             :     /*
     329             :      * We should be the only backend accessing this buffer. This is just a
     330             :      * small bit of belt-and-suspenders defense, none of this code should ever
     331             :      * run in a cluster with real data.
     332             :      */
     333         260 :     if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
     334           0 :         was_pinned = true;
     335             :     else
     336         260 :         buf_state &= ~(BM_VALID | BM_DIRTY);
     337             : 
     338         260 :     if (RelationUsesLocalBuffers(rel))
     339          88 :         pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
     340             :     else
     341         172 :         UnlockBufHdr(buf_hdr, buf_state);
     342             : 
     343         260 :     if (was_pinned)
     344           0 :         elog(ERROR, "toy buffer %d was already pinned",
     345             :              buf);
     346             : 
     347         260 :     return buf;
     348             : }
     349             : 
     350             : /*
     351             :  * A "low level" read. This does similar things to what
     352             :  * StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
     353             :  * less sanity).
     354             :  */
     355          40 : PG_FUNCTION_INFO_V1(read_rel_block_ll);
     356             : Datum
     357         144 : read_rel_block_ll(PG_FUNCTION_ARGS)
     358             : {
     359         144 :     Oid         relid = PG_GETARG_OID(0);
     360         144 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     361         144 :     int         nblocks = PG_GETARG_INT32(2);
     362         144 :     bool        wait_complete = PG_GETARG_BOOL(3);
     363         144 :     bool        batchmode_enter = PG_GETARG_BOOL(4);
     364         144 :     bool        call_smgrreleaseall = PG_GETARG_BOOL(5);
     365         144 :     bool        batchmode_exit = PG_GETARG_BOOL(6);
     366         144 :     bool        zero_on_error = PG_GETARG_BOOL(7);
     367             :     Relation    rel;
     368             :     Buffer      bufs[PG_IOV_MAX];
     369             :     BufferDesc *buf_hdrs[PG_IOV_MAX];
     370             :     Page        pages[PG_IOV_MAX];
     371         144 :     uint8       srb_flags = 0;
     372             :     PgAioReturn ior;
     373             :     PgAioHandle *ioh;
     374             :     PgAioWaitRef iow;
     375             :     SMgrRelation smgr;
     376             : 
     377         144 :     if (nblocks <= 0 || nblocks > PG_IOV_MAX)
     378           0 :         elog(ERROR, "nblocks is out of range");
     379             : 
     380         144 :     rel = relation_open(relid, AccessExclusiveLock);
     381             : 
     382         392 :     for (int i = 0; i < nblocks; i++)
     383             :     {
     384         248 :         bufs[i] = create_toy_buffer(rel, blkno + i);
     385         248 :         pages[i] = BufferGetBlock(bufs[i]);
     386         248 :         buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
     387         248 :             GetLocalBufferDescriptor(-bufs[i] - 1) :
     388         164 :             GetBufferDescriptor(bufs[i] - 1);
     389             :     }
     390             : 
     391         144 :     smgr = RelationGetSmgr(rel);
     392             : 
     393         144 :     pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
     394             : 
     395         144 :     ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
     396         144 :     pgaio_io_get_wref(ioh, &iow);
     397             : 
     398         144 :     if (RelationUsesLocalBuffers(rel))
     399             :     {
     400         128 :         for (int i = 0; i < nblocks; i++)
     401          84 :             StartLocalBufferIO(buf_hdrs[i], true, false);
     402          44 :         pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
     403             :     }
     404             :     else
     405             :     {
     406         264 :         for (int i = 0; i < nblocks; i++)
     407         164 :             StartBufferIO(buf_hdrs[i], true, false);
     408             :     }
     409             : 
     410         144 :     pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
     411             : 
     412         144 :     if (zero_on_error | zero_damaged_pages)
     413          44 :         srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
     414         144 :     if (ignore_checksum_failure)
     415          20 :         srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
     416             : 
     417         144 :     pgaio_io_register_callbacks(ioh,
     418         144 :                                 RelationUsesLocalBuffers(rel) ?
     419             :                                 PGAIO_HCB_LOCAL_BUFFER_READV :
     420             :                                 PGAIO_HCB_SHARED_BUFFER_READV,
     421             :                                 srb_flags);
     422             : 
     423         144 :     if (batchmode_enter)
     424           8 :         pgaio_enter_batchmode();
     425             : 
     426         144 :     smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
     427             :                    (void *) pages, nblocks);
     428             : 
     429         144 :     if (call_smgrreleaseall)
     430           8 :         smgrreleaseall();
     431             : 
     432         144 :     if (batchmode_exit)
     433           8 :         pgaio_exit_batchmode();
     434             : 
     435         392 :     for (int i = 0; i < nblocks; i++)
     436         248 :         ReleaseBuffer(bufs[i]);
     437             : 
     438         144 :     if (wait_complete)
     439             :     {
     440         100 :         pgaio_wref_wait(&iow);
     441             : 
     442         100 :         if (ior.result.status != PGAIO_RS_OK)
     443          88 :             pgaio_result_report(ior.result,
     444             :                                 &ior.target_data,
     445          88 :                                 ior.result.status == PGAIO_RS_ERROR ?
     446             :                                 ERROR : WARNING);
     447             :     }
     448             : 
     449         100 :     relation_close(rel, NoLock);
     450             : 
     451         100 :     PG_RETURN_VOID();
     452             : }
     453             : 
     454          22 : PG_FUNCTION_INFO_V1(invalidate_rel_block);
     455             : Datum
     456         278 : invalidate_rel_block(PG_FUNCTION_ARGS)
     457             : {
     458         278 :     Oid         relid = PG_GETARG_OID(0);
     459         278 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     460             :     Relation    rel;
     461             :     PrefetchBufferResult pr;
     462             :     Buffer      buf;
     463             : 
     464         278 :     rel = relation_open(relid, AccessExclusiveLock);
     465             : 
     466             :     /*
     467             :      * This is a gross hack, but there's no other API exposed that allows to
     468             :      * get a buffer ID without actually reading the block in.
     469             :      */
     470         278 :     pr = PrefetchBuffer(rel, MAIN_FORKNUM, blkno);
     471         278 :     buf = pr.recent_buffer;
     472             : 
     473         278 :     if (BufferIsValid(buf))
     474             :     {
     475             :         /* if the buffer contents aren't valid, this'll return false */
     476         250 :         if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
     477             :         {
     478         238 :             BufferDesc *buf_hdr = BufferIsLocal(buf) ?
     479          64 :                 GetLocalBufferDescriptor(-buf - 1)
     480         238 :                 : GetBufferDescriptor(buf - 1);
     481             : 
     482         238 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     483             : 
     484         238 :             if (pg_atomic_read_u32(&buf_hdr->state) & BM_DIRTY)
     485             :             {
     486         144 :                 if (BufferIsLocal(buf))
     487          44 :                     FlushLocalBuffer(buf_hdr, NULL);
     488             :                 else
     489         100 :                     FlushOneBuffer(buf);
     490             :             }
     491         238 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     492         238 :             ReleaseBuffer(buf);
     493             : 
     494         238 :             if (BufferIsLocal(buf))
     495          64 :                 InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
     496         174 :             else if (!EvictUnpinnedBuffer(buf))
     497           0 :                 elog(ERROR, "couldn't evict");
     498             :         }
     499             :     }
     500             : 
     501         278 :     relation_close(rel, AccessExclusiveLock);
     502             : 
     503         278 :     PG_RETURN_VOID();
     504             : }
     505             : 
     506          12 : PG_FUNCTION_INFO_V1(buffer_create_toy);
     507             : Datum
     508          12 : buffer_create_toy(PG_FUNCTION_ARGS)
     509             : {
     510          12 :     Oid         relid = PG_GETARG_OID(0);
     511          12 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     512             :     Relation    rel;
     513             :     Buffer      buf;
     514             : 
     515          12 :     rel = relation_open(relid, AccessExclusiveLock);
     516             : 
     517          12 :     buf = create_toy_buffer(rel, blkno);
     518          12 :     ReleaseBuffer(buf);
     519             : 
     520          12 :     relation_close(rel, NoLock);
     521             : 
     522          12 :     PG_RETURN_INT32(buf);
     523             : }
     524             : 
     525          16 : PG_FUNCTION_INFO_V1(buffer_call_start_io);
     526             : Datum
     527          40 : buffer_call_start_io(PG_FUNCTION_ARGS)
     528             : {
     529          40 :     Buffer      buf = PG_GETARG_INT32(0);
     530          40 :     bool        for_input = PG_GETARG_BOOL(1);
     531          40 :     bool        nowait = PG_GETARG_BOOL(2);
     532             :     bool        can_start;
     533             : 
     534          40 :     if (BufferIsLocal(buf))
     535          16 :         can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
     536             :                                        for_input, nowait);
     537             :     else
     538          24 :         can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
     539             :                                   for_input, nowait);
     540             : 
     541             :     /*
     542             :      * For tests we don't want the resowner release preventing us from
     543             :      * orchestrating odd scenarios.
     544             :      */
     545          40 :     if (can_start && !BufferIsLocal(buf))
     546          12 :         ResourceOwnerForgetBufferIO(CurrentResourceOwner,
     547             :                                     buf);
     548             : 
     549          40 :     ereport(LOG,
     550             :             errmsg("buffer %d after StartBufferIO: %s",
     551             :                    buf, DebugPrintBufferRefcount(buf)),
     552             :             errhidestmt(true), errhidecontext(true));
     553             : 
     554          40 :     PG_RETURN_BOOL(can_start);
     555             : }
     556             : 
     557          16 : PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
     558             : Datum
     559          20 : buffer_call_terminate_io(PG_FUNCTION_ARGS)
     560             : {
     561          20 :     Buffer      buf = PG_GETARG_INT32(0);
     562          20 :     bool        for_input = PG_GETARG_BOOL(1);
     563          20 :     bool        succeed = PG_GETARG_BOOL(2);
     564          20 :     bool        io_error = PG_GETARG_BOOL(3);
     565          20 :     bool        release_aio = PG_GETARG_BOOL(4);
     566          20 :     bool        clear_dirty = false;
     567          20 :     uint32      set_flag_bits = 0;
     568             : 
     569          20 :     if (io_error)
     570           0 :         set_flag_bits |= BM_IO_ERROR;
     571             : 
     572          20 :     if (for_input)
     573             :     {
     574          20 :         clear_dirty = false;
     575             : 
     576          20 :         if (succeed)
     577           8 :             set_flag_bits |= BM_VALID;
     578             :     }
     579             :     else
     580             :     {
     581           0 :         if (succeed)
     582           0 :             clear_dirty = true;
     583             :     }
     584             : 
     585          20 :     ereport(LOG,
     586             :             errmsg("buffer %d before Terminate[Local]BufferIO: %s",
     587             :                    buf, DebugPrintBufferRefcount(buf)),
     588             :             errhidestmt(true), errhidecontext(true));
     589             : 
     590          20 :     if (BufferIsLocal(buf))
     591           8 :         TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
     592             :                                clear_dirty, set_flag_bits, release_aio);
     593             :     else
     594          12 :         TerminateBufferIO(GetBufferDescriptor(buf - 1),
     595             :                           clear_dirty, set_flag_bits, false, release_aio);
     596             : 
     597          20 :     ereport(LOG,
     598             :             errmsg("buffer %d after Terminate[Local]BufferIO: %s",
     599             :                    buf, DebugPrintBufferRefcount(buf)),
     600             :             errhidestmt(true), errhidecontext(true));
     601             : 
     602          20 :     PG_RETURN_VOID();
     603             : }
     604             : 
     605          12 : PG_FUNCTION_INFO_V1(handle_get);
     606             : Datum
     607          24 : handle_get(PG_FUNCTION_ARGS)
     608             : {
     609          24 :     last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
     610             : 
     611          24 :     PG_RETURN_VOID();
     612             : }
     613             : 
     614          12 : PG_FUNCTION_INFO_V1(handle_release_last);
     615             : Datum
     616           8 : handle_release_last(PG_FUNCTION_ARGS)
     617             : {
     618           8 :     if (!last_handle)
     619           0 :         elog(ERROR, "no handle");
     620             : 
     621           8 :     pgaio_io_release(last_handle);
     622             : 
     623           4 :     PG_RETURN_VOID();
     624             : }
     625             : 
     626          12 : PG_FUNCTION_INFO_V1(handle_get_and_error);
     627             : Datum
     628          12 : handle_get_and_error(PG_FUNCTION_ARGS)
     629             : {
     630          12 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     631             : 
     632          12 :     elog(ERROR, "as you command");
     633             :     PG_RETURN_VOID();
     634             : }
     635             : 
     636          12 : PG_FUNCTION_INFO_V1(handle_get_twice);
     637             : Datum
     638           4 : handle_get_twice(PG_FUNCTION_ARGS)
     639             : {
     640           4 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     641           4 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     642             : 
     643           0 :     PG_RETURN_VOID();
     644             : }
     645             : 
     646          12 : PG_FUNCTION_INFO_V1(handle_get_release);
     647             : Datum
     648          12 : handle_get_release(PG_FUNCTION_ARGS)
     649             : {
     650             :     PgAioHandle *handle;
     651             : 
     652          12 :     handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
     653          12 :     pgaio_io_release(handle);
     654             : 
     655          12 :     PG_RETURN_VOID();
     656             : }
     657             : 
     658          12 : PG_FUNCTION_INFO_V1(batch_start);
     659             : Datum
     660          12 : batch_start(PG_FUNCTION_ARGS)
     661             : {
     662          12 :     pgaio_enter_batchmode();
     663          12 :     PG_RETURN_VOID();
     664             : }
     665             : 
     666          12 : PG_FUNCTION_INFO_V1(batch_end);
     667             : Datum
     668           4 : batch_end(PG_FUNCTION_ARGS)
     669             : {
     670           4 :     pgaio_exit_batchmode();
     671           4 :     PG_RETURN_VOID();
     672             : }
     673             : 
     674             : #ifdef USE_INJECTION_POINTS
     675             : extern PGDLLEXPORT void inj_io_short_read(const char *name, const void *private_data);
     676             : extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data);
     677             : 
     678             : void
     679        2558 : inj_io_short_read(const char *name, const void *private_data)
     680             : {
     681             :     PgAioHandle *ioh;
     682             : 
     683        2558 :     ereport(LOG,
     684             :             errmsg("short read injection point called, is enabled: %d",
     685             :                    inj_io_error_state->enabled_reopen),
     686             :             errhidestmt(true), errhidecontext(true));
     687             : 
     688        2558 :     if (inj_io_error_state->enabled_short_read)
     689             :     {
     690          96 :         ioh = pgaio_inj_io_get();
     691             : 
     692             :         /*
     693             :          * Only shorten reads that are actually longer than the target size,
     694             :          * otherwise we can trigger over-reads.
     695             :          */
     696          96 :         if (inj_io_error_state->short_read_result_set
     697          96 :             && ioh->op == PGAIO_OP_READV
     698          96 :             && inj_io_error_state->short_read_result <= ioh->result)
     699             :         {
     700          88 :             struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
     701          88 :             int32       old_result = ioh->result;
     702          88 :             int32       new_result = inj_io_error_state->short_read_result;
     703          88 :             int32       processed = 0;
     704             : 
     705          88 :             ereport(LOG,
     706             :                     errmsg("short read inject point, changing result from %d to %d",
     707             :                            old_result, new_result),
     708             :                     errhidestmt(true), errhidecontext(true));
     709             : 
     710             :             /*
     711             :              * The underlying IO actually completed OK, and thus the "invalid"
     712             :              * portion of the IOV actually contains valid data. That can hide
     713             :              * a lot of problems, e.g. if we were to wrongly mark a buffer,
     714             :              * that wasn't read according to the shortened-read, IO as valid,
     715             :              * the contents would look valid and we might miss a bug.
     716             :              *
     717             :              * To avoid that, iterate through the IOV and zero out the
     718             :              * "failed" portion of the IO.
     719             :              */
     720         184 :             for (int i = 0; i < ioh->op_data.read.iov_length; i++)
     721             :             {
     722          96 :                 if (processed + iov[i].iov_len <= new_result)
     723          68 :                     processed += iov[i].iov_len;
     724          28 :                 else if (processed <= new_result)
     725             :                 {
     726          28 :                     uint32      ok_part = new_result - processed;
     727             : 
     728          28 :                     memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
     729          28 :                     processed += iov[i].iov_len;
     730             :                 }
     731             :                 else
     732             :                 {
     733           0 :                     memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
     734             :                 }
     735             :             }
     736             : 
     737          88 :             ioh->result = new_result;
     738             :         }
     739             :     }
     740        2558 : }
     741             : 
     742             : void
     743         692 : inj_io_reopen(const char *name, const void *private_data)
     744             : {
     745         692 :     ereport(LOG,
     746             :             errmsg("reopen injection point called, is enabled: %d",
     747             :                    inj_io_error_state->enabled_reopen),
     748             :             errhidestmt(true), errhidecontext(true));
     749             : 
     750         692 :     if (inj_io_error_state->enabled_reopen)
     751           2 :         elog(ERROR, "injection point triggering failure to reopen ");
     752         690 : }
     753             : #endif
     754             : 
     755          12 : PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
     756             : Datum
     757          28 : inj_io_short_read_attach(PG_FUNCTION_ARGS)
     758             : {
     759             : #ifdef USE_INJECTION_POINTS
     760          28 :     inj_io_error_state->enabled_short_read = true;
     761          28 :     inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
     762          28 :     if (inj_io_error_state->short_read_result_set)
     763          28 :         inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
     764             : #else
     765             :     elog(ERROR, "injection points not supported");
     766             : #endif
     767             : 
     768          28 :     PG_RETURN_VOID();
     769             : }
     770             : 
     771          12 : PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
     772             : Datum
     773           8 : inj_io_short_read_detach(PG_FUNCTION_ARGS)
     774             : {
     775             : #ifdef USE_INJECTION_POINTS
     776           8 :     inj_io_error_state->enabled_short_read = false;
     777             : #else
     778             :     elog(ERROR, "injection points not supported");
     779             : #endif
     780           8 :     PG_RETURN_VOID();
     781             : }
     782             : 
     783          10 : PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
     784             : Datum
     785           2 : inj_io_reopen_attach(PG_FUNCTION_ARGS)
     786             : {
     787             : #ifdef USE_INJECTION_POINTS
     788           2 :     inj_io_error_state->enabled_reopen = true;
     789             : #else
     790             :     elog(ERROR, "injection points not supported");
     791             : #endif
     792             : 
     793           2 :     PG_RETURN_VOID();
     794             : }
     795             : 
     796          10 : PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
     797             : Datum
     798           2 : inj_io_reopen_detach(PG_FUNCTION_ARGS)
     799             : {
     800             : #ifdef USE_INJECTION_POINTS
     801           2 :     inj_io_error_state->enabled_reopen = false;
     802             : #else
     803             :     elog(ERROR, "injection points not supported");
     804             : #endif
     805           2 :     PG_RETURN_VOID();
     806             : }

Generated by: LCOV version 1.14