LCOV - code coverage report
Current view: top level - src/test/modules/test_aio - test_aio.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.8 % 500 474
Test Date: 2026-04-07 14:16:30 Functions: 98.4 % 62 61
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * test_aio.c
       4              :  *      Helpers to write tests for AIO
       5              :  *
       6              :  * This module provides interface functions for C functionality to SQL, to
       7              :  * make it possible to test AIO related behavior in a targeted way from SQL.
       8              :  * It'd not generally be safe to export these functions to SQL, but for a test
       9              :  * that's fine.
      10              :  *
      11              :  * Copyright (c) 2020-2026, PostgreSQL Global Development Group
      12              :  *
      13              :  * IDENTIFICATION
      14              :  *    src/test/modules/test_aio/test_aio.c
      15              :  *
      16              :  *-------------------------------------------------------------------------
      17              :  */
      18              : 
      19              : #include "postgres.h"
      20              : 
      21              : #include "access/relation.h"
      22              : #include "catalog/pg_type.h"
      23              : #include "fmgr.h"
      24              : #include "funcapi.h"
      25              : #include "storage/aio.h"
      26              : #include "storage/aio_internal.h"
      27              : #include "storage/buf_internals.h"
      28              : #include "storage/bufmgr.h"
      29              : #include "storage/checksum.h"
      30              : #include "storage/condition_variable.h"
      31              : #include "storage/lwlock.h"
      32              : #include "storage/proc.h"
      33              : #include "storage/procnumber.h"
      34              : #include "storage/read_stream.h"
      35              : #include "utils/array.h"
      36              : #include "utils/builtins.h"
      37              : #include "utils/injection_point.h"
      38              : #include "utils/rel.h"
      39              : #include "utils/tuplestore.h"
      40              : #include "utils/wait_event.h"
      41              : 
      42              : 
      43            7 : PG_MODULE_MAGIC;
      44              : 
      45              : 
      46              : /* In shared memory */
      47              : typedef struct InjIoErrorState
      48              : {
      49              :     ConditionVariable cv;
      50              : 
      51              :     bool        enabled_short_read;
      52              :     bool        enabled_reopen;
      53              : 
      54              :     bool        enabled_completion_wait;
      55              :     Oid         completion_wait_relfilenode;
      56              :     BlockNumber completion_wait_blockno;
      57              :     pid_t       completion_wait_pid;
      58              :     uint32      completion_wait_event;
      59              : 
      60              :     bool        short_read_result_set;
      61              :     Oid         short_read_relfilenode;
      62              :     pid_t       short_read_pid;
      63              :     int         short_read_result;
      64              : } InjIoErrorState;
      65              : 
      66              : typedef struct BlocksReadStreamData
      67              : {
      68              :     int         nblocks;
      69              :     int         curblock;
      70              :     uint32     *blocks;
      71              : } BlocksReadStreamData;
      72              : 
      73              : 
      74              : static InjIoErrorState *inj_io_error_state;
      75              : 
      76              : /* Shared memory init callbacks */
      77              : static void test_aio_shmem_request(void *arg);
      78              : static void test_aio_shmem_init(void *arg);
      79              : static void test_aio_shmem_attach(void *arg);
      80              : 
      81              : static const ShmemCallbacks inj_io_shmem_callbacks = {
      82              :     .request_fn = test_aio_shmem_request,
      83              :     .init_fn = test_aio_shmem_init,
      84              :     .attach_fn = test_aio_shmem_attach,
      85              : };
      86              : 
      87              : 
      88              : static PgAioHandle *last_handle;
      89              : 
      90              : 
      91              : 
      92              : static void
      93            7 : test_aio_shmem_request(void *arg)
      94              : {
      95            7 :     ShmemRequestStruct(.name = "test_aio injection points",
      96              :                        .size = sizeof(InjIoErrorState),
      97              :                        .ptr = (void **) &inj_io_error_state,
      98              :         );
      99            7 : }
     100              : 
     101              : static void
     102            7 : test_aio_shmem_init(void *arg)
     103              : {
     104              :     /* First time through, initialize */
     105            7 :     inj_io_error_state->enabled_short_read = false;
     106            7 :     inj_io_error_state->enabled_reopen = false;
     107            7 :     inj_io_error_state->enabled_completion_wait = false;
     108              : 
     109            7 :     ConditionVariableInit(&inj_io_error_state->cv);
     110            7 :     inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
     111              : 
     112              : #ifdef USE_INJECTION_POINTS
     113            7 :     InjectionPointAttach("aio-process-completion-before-shared",
     114              :                          "test_aio",
     115              :                          "inj_io_completion_hook",
     116              :                          NULL,
     117              :                          0);
     118            7 :     InjectionPointLoad("aio-process-completion-before-shared");
     119              : 
     120            7 :     InjectionPointAttach("aio-worker-after-reopen",
     121              :                          "test_aio",
     122              :                          "inj_io_reopen",
     123              :                          NULL,
     124              :                          0);
     125            7 :     InjectionPointLoad("aio-worker-after-reopen");
     126              : 
     127              : #endif
     128            7 : }
     129              : 
     130              : static void
     131            0 : test_aio_shmem_attach(void *arg)
     132              : {
     133              :     /*
     134              :      * Pre-load the injection points now, so we can call them in a critical
     135              :      * section.
     136              :      */
     137              : #ifdef USE_INJECTION_POINTS
     138            0 :     InjectionPointLoad("aio-process-completion-before-shared");
     139            0 :     InjectionPointLoad("aio-worker-after-reopen");
     140            0 :     elog(LOG, "injection point loaded");
     141              : #endif
     142            0 : }
     143              : 
     144              : void
     145            7 : _PG_init(void)
     146              : {
     147            7 :     if (!process_shared_preload_libraries_in_progress)
     148            0 :         return;
     149              : 
     150            7 :     RegisterShmemCallbacks(&inj_io_shmem_callbacks);
     151              : }
     152              : 
     153              : 
     154            9 : PG_FUNCTION_INFO_V1(errno_from_string);
     155              : Datum
     156            6 : errno_from_string(PG_FUNCTION_ARGS)
     157              : {
     158            6 :     const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
     159              : 
     160            6 :     if (strcmp(sym, "EIO") == 0)
     161            4 :         PG_RETURN_INT32(EIO);
     162            2 :     else if (strcmp(sym, "EAGAIN") == 0)
     163            0 :         PG_RETURN_INT32(EAGAIN);
     164            2 :     else if (strcmp(sym, "EINTR") == 0)
     165            0 :         PG_RETURN_INT32(EINTR);
     166            2 :     else if (strcmp(sym, "ENOSPC") == 0)
     167            0 :         PG_RETURN_INT32(ENOSPC);
     168            2 :     else if (strcmp(sym, "EROFS") == 0)
     169            2 :         PG_RETURN_INT32(EROFS);
     170              : 
     171            0 :     ereport(ERROR,
     172              :             errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     173              :             errmsg_internal("%s is not a supported errno value", sym));
     174              :     PG_RETURN_INT32(0);
     175              : }
     176              : 
     177            9 : PG_FUNCTION_INFO_V1(grow_rel);
     178              : Datum
     179            6 : grow_rel(PG_FUNCTION_ARGS)
     180              : {
     181            6 :     Oid         relid = PG_GETARG_OID(0);
     182            6 :     uint32      nblocks = PG_GETARG_UINT32(1);
     183              :     Relation    rel;
     184              : #define MAX_BUFFERS_TO_EXTEND_BY 64
     185              :     Buffer      victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
     186              : 
     187            6 :     rel = relation_open(relid, AccessExclusiveLock);
     188              : 
     189           12 :     while (nblocks > 0)
     190              :     {
     191              :         uint32      extend_by_pages;
     192              : 
     193            6 :         extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
     194              : 
     195            6 :         ExtendBufferedRelBy(BMR_REL(rel),
     196              :                             MAIN_FORKNUM,
     197              :                             NULL,
     198              :                             0,
     199              :                             extend_by_pages,
     200              :                             victim_buffers,
     201              :                             &extend_by_pages);
     202              : 
     203            6 :         nblocks -= extend_by_pages;
     204              : 
     205           78 :         for (uint32 i = 0; i < extend_by_pages; i++)
     206              :         {
     207           72 :             ReleaseBuffer(victim_buffers[i]);
     208              :         }
     209              :     }
     210              : 
     211            6 :     relation_close(rel, NoLock);
     212              : 
     213            6 :     PG_RETURN_VOID();
     214              : }
     215              : 
     216           19 : PG_FUNCTION_INFO_V1(modify_rel_block);
     217              : Datum
     218           70 : modify_rel_block(PG_FUNCTION_ARGS)
     219              : {
     220           70 :     Oid         relid = PG_GETARG_OID(0);
     221           70 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     222           70 :     bool        zero = PG_GETARG_BOOL(2);
     223           70 :     bool        corrupt_header = PG_GETARG_BOOL(3);
     224           70 :     bool        corrupt_checksum = PG_GETARG_BOOL(4);
     225           70 :     Page        page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     226              :     bool        flushed;
     227              :     Relation    rel;
     228              :     Buffer      buf;
     229              :     PageHeader  ph;
     230              : 
     231           70 :     rel = relation_open(relid, AccessExclusiveLock);
     232              : 
     233           70 :     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
     234              :                              RBM_ZERO_ON_ERROR, NULL);
     235              : 
     236           70 :     LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     237              : 
     238              :     /*
     239              :      * copy the page to local memory, seems nicer than to directly modify in
     240              :      * the buffer pool.
     241              :      */
     242           70 :     memcpy(page, BufferGetPage(buf), BLCKSZ);
     243              : 
     244           70 :     UnlockReleaseBuffer(buf);
     245              : 
     246              :     /*
     247              :      * Don't want to have a buffer in-memory that's marked valid where the
     248              :      * on-disk contents are invalid. Particularly not if the in-memory buffer
     249              :      * could be dirty...
     250              :      *
     251              :      * While we hold an AEL on the relation nobody else should be able to read
     252              :      * the buffer in.
     253              :      *
     254              :      * NB: This is probably racy, better don't copy this to non-test code.
     255              :      */
     256           70 :     if (BufferIsLocal(buf))
     257           18 :         InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
     258              :     else
     259           52 :         EvictUnpinnedBuffer(buf, &flushed);
     260              : 
     261              :     /*
     262              :      * Now modify the page as asked for by the caller.
     263              :      */
     264           70 :     if (zero)
     265           16 :         memset(page, 0, BufferGetPageSize(buf));
     266              : 
     267           70 :     if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
     268           16 :         PageInit(page, BufferGetPageSize(buf), 0);
     269              : 
     270           70 :     ph = (PageHeader) page;
     271              : 
     272           70 :     if (corrupt_header)
     273           32 :         ph->pd_special = BLCKSZ + 1;
     274              : 
     275           70 :     if (corrupt_checksum)
     276              :     {
     277           26 :         bool        successfully_corrupted = 0;
     278              : 
     279              :         /*
     280              :          * Any single modification of the checksum could just end up being
     281              :          * valid again, due to e.g. corrupt_header changing the data in a way
     282              :          * that'd result in the "corrupted" checksum, or the checksum already
     283              :          * being invalid. Retry in that, unlikely, case.
     284              :          */
     285           26 :         for (int i = 0; i < 100; i++)
     286              :         {
     287              :             uint16      verify_checksum;
     288              :             uint16      old_checksum;
     289              : 
     290           26 :             old_checksum = ph->pd_checksum;
     291           26 :             ph->pd_checksum = old_checksum + 1;
     292              : 
     293           26 :             elog(LOG, "corrupting checksum of blk %u from %u to %u",
     294              :                  blkno, old_checksum, ph->pd_checksum);
     295              : 
     296           26 :             verify_checksum = pg_checksum_page(page, blkno);
     297           26 :             if (verify_checksum != ph->pd_checksum)
     298              :             {
     299           26 :                 successfully_corrupted = true;
     300           26 :                 break;
     301              :             }
     302              :         }
     303              : 
     304           26 :         if (!successfully_corrupted)
     305            0 :             elog(ERROR, "could not corrupt checksum, what's going on?");
     306              :     }
     307              :     else
     308              :     {
     309           44 :         PageSetChecksum(page, blkno);
     310              :     }
     311              : 
     312           70 :     smgrwrite(RelationGetSmgr(rel),
     313              :               MAIN_FORKNUM, blkno, page, true);
     314              : 
     315           70 :     relation_close(rel, NoLock);
     316              : 
     317           70 :     PG_RETURN_VOID();
     318              : }
     319              : 
     320              : /*
     321              :  * Ensures a buffer for rel & blkno is in shared buffers, without actually
     322              :  * caring about the buffer contents. Used to set up test scenarios.
     323              :  */
     324              : static Buffer
     325          160 : create_toy_buffer(Relation rel, BlockNumber blkno)
     326              : {
     327              :     Buffer      buf;
     328              :     BufferDesc *buf_hdr;
     329              :     uint64      buf_state;
     330          160 :     bool        was_pinned = false;
     331          160 :     uint64      unset_bits = 0;
     332              : 
     333              :     /* place buffer in shared buffers without erroring out */
     334          160 :     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
     335          160 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     336              : 
     337          160 :     if (RelationUsesLocalBuffers(rel))
     338              :     {
     339           50 :         buf_hdr = GetLocalBufferDescriptor(-buf - 1);
     340           50 :         buf_state = pg_atomic_read_u64(&buf_hdr->state);
     341              :     }
     342              :     else
     343              :     {
     344          110 :         buf_hdr = GetBufferDescriptor(buf - 1);
     345          110 :         buf_state = LockBufHdr(buf_hdr);
     346              :     }
     347              : 
     348              :     /*
     349              :      * We should be the only backend accessing this buffer. This is just a
     350              :      * small bit of belt-and-suspenders defense, none of this code should ever
     351              :      * run in a cluster with real data.
     352              :      */
     353          160 :     if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
     354            0 :         was_pinned = true;
     355              :     else
     356          160 :         unset_bits |= BM_VALID | BM_DIRTY;
     357              : 
     358          160 :     if (RelationUsesLocalBuffers(rel))
     359              :     {
     360           50 :         buf_state &= ~unset_bits;
     361           50 :         pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
     362              :     }
     363              :     else
     364              :     {
     365          110 :         UnlockBufHdrExt(buf_hdr, buf_state, 0, unset_bits, 0);
     366              :     }
     367              : 
     368          160 :     if (was_pinned)
     369            0 :         elog(ERROR, "toy buffer %d was already pinned",
     370              :              buf);
     371              : 
     372          160 :     return buf;
     373              : }
     374              : 
     375              : /*
     376              :  * A "low level" read. This does similar things to what
     377              :  * StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
     378              :  * less sanity).
     379              :  */
     380           27 : PG_FUNCTION_INFO_V1(read_rel_block_ll);
     381              : Datum
     382           94 : read_rel_block_ll(PG_FUNCTION_ARGS)
     383              : {
     384           94 :     Oid         relid = PG_GETARG_OID(0);
     385           94 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     386           94 :     int         nblocks = PG_GETARG_INT32(2);
     387           94 :     bool        wait_complete = PG_GETARG_BOOL(3);
     388           94 :     bool        batchmode_enter = PG_GETARG_BOOL(4);
     389           94 :     bool        call_smgrreleaseall = PG_GETARG_BOOL(5);
     390           94 :     bool        batchmode_exit = PG_GETARG_BOOL(6);
     391           94 :     bool        zero_on_error = PG_GETARG_BOOL(7);
     392              :     Relation    rel;
     393              :     Buffer      bufs[PG_IOV_MAX];
     394              :     BufferDesc *buf_hdrs[PG_IOV_MAX];
     395              :     Page        pages[PG_IOV_MAX];
     396           94 :     uint8       srb_flags = 0;
     397              :     PgAioReturn ior;
     398              :     PgAioHandle *ioh;
     399              :     PgAioWaitRef iow;
     400              :     SMgrRelation smgr;
     401              : 
     402           94 :     if (nblocks <= 0 || nblocks > PG_IOV_MAX)
     403            0 :         elog(ERROR, "nblocks is out of range");
     404              : 
     405           94 :     rel = relation_open(relid, AccessShareLock);
     406              : 
     407          246 :     for (int i = 0; i < nblocks; i++)
     408              :     {
     409          152 :         bufs[i] = create_toy_buffer(rel, blkno + i);
     410          152 :         pages[i] = BufferGetBlock(bufs[i]);
     411          152 :         buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
     412          152 :             GetLocalBufferDescriptor(-bufs[i] - 1) :
     413          104 :             GetBufferDescriptor(bufs[i] - 1);
     414              :     }
     415              : 
     416           94 :     smgr = RelationGetSmgr(rel);
     417              : 
     418           94 :     pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
     419              : 
     420           94 :     ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
     421           94 :     pgaio_io_get_wref(ioh, &iow);
     422              : 
     423           94 :     if (RelationUsesLocalBuffers(rel))
     424              :     {
     425           76 :         for (int i = 0; i < nblocks; i++)
     426           48 :             StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
     427           28 :         pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
     428              :     }
     429              :     else
     430              :     {
     431          170 :         for (int i = 0; i < nblocks; i++)
     432          104 :             StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
     433              :     }
     434              : 
     435           94 :     pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
     436              : 
     437           94 :     if (zero_on_error | zero_damaged_pages)
     438           22 :         srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
     439           94 :     if (ignore_checksum_failure)
     440           10 :         srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
     441              : 
     442           94 :     pgaio_io_register_callbacks(ioh,
     443           94 :                                 RelationUsesLocalBuffers(rel) ?
     444              :                                 PGAIO_HCB_LOCAL_BUFFER_READV :
     445              :                                 PGAIO_HCB_SHARED_BUFFER_READV,
     446              :                                 srb_flags);
     447              : 
     448           94 :     if (batchmode_enter)
     449            4 :         pgaio_enter_batchmode();
     450              : 
     451           94 :     smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
     452              :                    (void *) pages, nblocks);
     453              : 
     454           94 :     if (call_smgrreleaseall)
     455            4 :         smgrreleaseall();
     456              : 
     457           94 :     if (batchmode_exit)
     458            4 :         pgaio_exit_batchmode();
     459              : 
     460          246 :     for (int i = 0; i < nblocks; i++)
     461          152 :         ReleaseBuffer(bufs[i]);
     462              : 
     463           94 :     if (wait_complete)
     464              :     {
     465           60 :         pgaio_wref_wait(&iow);
     466              : 
     467           60 :         if (ior.result.status != PGAIO_RS_OK)
     468           46 :             pgaio_result_report(ior.result,
     469              :                                 &ior.target_data,
     470           46 :                                 ior.result.status == PGAIO_RS_ERROR ?
     471              :                                 ERROR : WARNING);
     472              :     }
     473              : 
     474           70 :     relation_close(rel, NoLock);
     475              : 
     476           70 :     PG_RETURN_VOID();
     477              : }
     478              : 
     479              : /* helper for invalidate_rel_block() and evict_rel() */
     480              : static void
     481          469 : invalidate_one_block(Relation rel, ForkNumber forknum, BlockNumber blkno)
     482              : {
     483              :     PrefetchBufferResult pr;
     484              :     Buffer      buf;
     485              : 
     486              :     /*
     487              :      * This is a gross hack, but there's no other API exposed that allows to
     488              :      * get a buffer ID without actually reading the block in.
     489              :      */
     490          469 :     pr = PrefetchBuffer(rel, forknum, blkno);
     491          469 :     buf = pr.recent_buffer;
     492              : 
     493          469 :     if (BufferIsValid(buf))
     494              :     {
     495              :         /* if the buffer contents aren't valid, this'll return false */
     496          230 :         if (ReadRecentBuffer(rel->rd_locator, forknum, blkno, buf))
     497              :         {
     498          224 :             BufferDesc *buf_hdr = BufferIsLocal(buf) ?
     499          140 :                 GetLocalBufferDescriptor(-buf - 1)
     500          224 :                 : GetBufferDescriptor(buf - 1);
     501              :             bool        flushed;
     502              : 
     503          224 :             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     504              : 
     505          224 :             if (pg_atomic_read_u64(&buf_hdr->state) & BM_DIRTY)
     506              :             {
     507          124 :                 if (BufferIsLocal(buf))
     508           74 :                     FlushLocalBuffer(buf_hdr, NULL);
     509              :                 else
     510           50 :                     FlushOneBuffer(buf);
     511              :             }
     512          224 :             UnlockReleaseBuffer(buf);
     513              : 
     514          224 :             if (BufferIsLocal(buf))
     515          140 :                 InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
     516           84 :             else if (!EvictUnpinnedBuffer(buf, &flushed))
     517            0 :                 elog(ERROR, "couldn't evict");
     518              :         }
     519              :     }
     520              : 
     521          469 : }
     522              : 
     523           16 : PG_FUNCTION_INFO_V1(invalidate_rel_block);
     524              : Datum
     525          157 : invalidate_rel_block(PG_FUNCTION_ARGS)
     526              : {
     527          157 :     Oid         relid = PG_GETARG_OID(0);
     528          157 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     529              :     Relation    rel;
     530              : 
     531          157 :     rel = relation_open(relid, AccessExclusiveLock);
     532              : 
     533          157 :     invalidate_one_block(rel, MAIN_FORKNUM, blkno);
     534              : 
     535          157 :     relation_close(rel, AccessExclusiveLock);
     536              : 
     537          157 :     PG_RETURN_VOID();
     538              : }
     539              : 
     540           13 : PG_FUNCTION_INFO_V1(evict_rel);
     541              : Datum
     542           40 : evict_rel(PG_FUNCTION_ARGS)
     543              : {
     544           40 :     Oid         relid = PG_GETARG_OID(0);
     545              :     Relation    rel;
     546              : 
     547           40 :     rel = relation_open(relid, AccessExclusiveLock);
     548              : 
     549              :     /*
     550              :      * EvictRelUnpinnedBuffers() doesn't support temp tables, so for temp
     551              :      * tables we have to do it the expensive way and evict every possible
     552              :      * buffer.
     553              :      */
     554           40 :     if (RelationUsesLocalBuffers(rel))
     555              :     {
     556           12 :         SMgrRelation smgr = RelationGetSmgr(rel);
     557              : 
     558           60 :         for (int forknum = MAIN_FORKNUM; forknum <= MAX_FORKNUM; forknum++)
     559              :         {
     560              :             BlockNumber nblocks;
     561              : 
     562           48 :             if (!smgrexists(smgr, forknum))
     563           24 :                 continue;
     564              : 
     565           24 :             nblocks = smgrnblocks(smgr, forknum);
     566              : 
     567          336 :             for (int blkno = 0; blkno < nblocks; blkno++)
     568              :             {
     569          312 :                 invalidate_one_block(rel, forknum, blkno);
     570              :             }
     571              :         }
     572              :     }
     573              :     else
     574              :     {
     575              :         int32       buffers_evicted,
     576              :                     buffers_flushed,
     577              :                     buffers_skipped;
     578              : 
     579           28 :         EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
     580              :                                 &buffers_skipped);
     581              :     }
     582              : 
     583           40 :     relation_close(rel, AccessExclusiveLock);
     584              : 
     585              : 
     586           40 :     PG_RETURN_VOID();
     587              : }
     588              : 
     589            8 : PG_FUNCTION_INFO_V1(buffer_create_toy);
     590              : Datum
     591            8 : buffer_create_toy(PG_FUNCTION_ARGS)
     592              : {
     593            8 :     Oid         relid = PG_GETARG_OID(0);
     594            8 :     BlockNumber blkno = PG_GETARG_UINT32(1);
     595              :     Relation    rel;
     596              :     Buffer      buf;
     597              : 
     598            8 :     rel = relation_open(relid, AccessExclusiveLock);
     599              : 
     600            8 :     buf = create_toy_buffer(rel, blkno);
     601            8 :     ReleaseBuffer(buf);
     602              : 
     603            8 :     relation_close(rel, NoLock);
     604              : 
     605            8 :     PG_RETURN_INT32(buf);
     606              : }
     607              : 
     608           10 : PG_FUNCTION_INFO_V1(buffer_call_start_io);
     609              : Datum
     610           22 : buffer_call_start_io(PG_FUNCTION_ARGS)
     611              : {
     612           22 :     Buffer      buf = PG_GETARG_INT32(0);
     613           22 :     bool        for_input = PG_GETARG_BOOL(1);
     614           22 :     bool        wait = PG_GETARG_BOOL(2);
     615              :     StartBufferIOResult result;
     616              :     bool        can_start;
     617              : 
     618           22 :     if (BufferIsLocal(buf))
     619            8 :         result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
     620              :                                     for_input, wait, NULL);
     621              :     else
     622           14 :         result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
     623              :                                      for_input, wait, NULL);
     624              : 
     625           22 :     can_start = result == BUFFER_IO_READY_FOR_IO;
     626              : 
     627              :     /*
     628              :      * For tests we don't want the resowner release preventing us from
     629              :      * orchestrating odd scenarios.
     630              :      */
     631           22 :     if (can_start && !BufferIsLocal(buf))
     632            8 :         ResourceOwnerForgetBufferIO(CurrentResourceOwner,
     633              :                                     buf);
     634              : 
     635           22 :     ereport(LOG,
     636              :             errmsg("buffer %d after StartBufferIO: %s",
     637              :                    buf, DebugPrintBufferRefcount(buf)),
     638              :             errhidestmt(true), errhidecontext(true));
     639              : 
     640           22 :     PG_RETURN_BOOL(can_start);
     641              : }
     642              : 
     643           10 : PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
     644              : Datum
     645           12 : buffer_call_terminate_io(PG_FUNCTION_ARGS)
     646              : {
     647           12 :     Buffer      buf = PG_GETARG_INT32(0);
     648           12 :     bool        for_input = PG_GETARG_BOOL(1);
     649           12 :     bool        succeed = PG_GETARG_BOOL(2);
     650           12 :     bool        io_error = PG_GETARG_BOOL(3);
     651           12 :     bool        release_aio = PG_GETARG_BOOL(4);
     652           12 :     bool        clear_dirty = false;
     653           12 :     uint64      set_flag_bits = 0;
     654              : 
     655           12 :     if (io_error)
     656            0 :         set_flag_bits |= BM_IO_ERROR;
     657              : 
     658           12 :     if (for_input)
     659              :     {
     660           12 :         clear_dirty = false;
     661              : 
     662           12 :         if (succeed)
     663            5 :             set_flag_bits |= BM_VALID;
     664              :     }
     665              :     else
     666              :     {
     667            0 :         if (succeed)
     668            0 :             clear_dirty = true;
     669              :     }
     670              : 
     671           12 :     ereport(LOG,
     672              :             errmsg("buffer %d before Terminate[Local]BufferIO: %s",
     673              :                    buf, DebugPrintBufferRefcount(buf)),
     674              :             errhidestmt(true), errhidecontext(true));
     675              : 
     676           12 :     if (BufferIsLocal(buf))
     677            4 :         TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
     678              :                                clear_dirty, set_flag_bits, release_aio);
     679              :     else
     680            8 :         TerminateBufferIO(GetBufferDescriptor(buf - 1),
     681              :                           clear_dirty, set_flag_bits, false, release_aio);
     682              : 
     683           12 :     ereport(LOG,
     684              :             errmsg("buffer %d after Terminate[Local]BufferIO: %s",
     685              :                    buf, DebugPrintBufferRefcount(buf)),
     686              :             errhidestmt(true), errhidecontext(true));
     687              : 
     688           12 :     PG_RETURN_VOID();
     689              : }
     690              : 
     691            9 : PG_FUNCTION_INFO_V1(read_buffers);
     692              : /*
     693              :  * Infrastructure to test StartReadBuffers()
     694              :  */
     695              : Datum
     696           74 : read_buffers(PG_FUNCTION_ARGS)
     697              : {
     698           74 :     Oid         relid = PG_GETARG_OID(0);
     699           74 :     BlockNumber startblock = PG_GETARG_UINT32(1);
     700           74 :     int32       nblocks = PG_GETARG_INT32(2);
     701           74 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     702              :     Relation    rel;
     703              :     SMgrRelation smgr;
     704           74 :     int         nblocks_done = 0;
     705           74 :     int         nblocks_disp = 0;
     706           74 :     int         nios = 0;
     707              :     ReadBuffersOperation *operations;
     708              :     Buffer     *buffers;
     709              :     Datum      *buffers_datum;
     710              :     bool       *io_reqds;
     711              :     int        *nblocks_per_io;
     712              : 
     713              :     Assert(nblocks > 0);
     714              : 
     715           74 :     InitMaterializedSRF(fcinfo, 0);
     716              : 
     717              :     /* at worst each block gets its own IO */
     718           74 :     operations = palloc0(sizeof(ReadBuffersOperation) * nblocks);
     719           74 :     buffers = palloc0(sizeof(Buffer) * nblocks);
     720           74 :     buffers_datum = palloc0(sizeof(Datum) * nblocks);
     721           74 :     io_reqds = palloc0(sizeof(bool) * nblocks);
     722           74 :     nblocks_per_io = palloc0(sizeof(int) * nblocks);
     723              : 
     724           74 :     rel = relation_open(relid, AccessShareLock);
     725           74 :     smgr = RelationGetSmgr(rel);
     726              : 
     727              :     /*
     728              :      * Do StartReadBuffers() until IO for all the required blocks has been
     729              :      * started (if required).
     730              :      */
     731          214 :     while (nblocks_done < nblocks)
     732              :     {
     733          140 :         ReadBuffersOperation *operation = &operations[nios];
     734          140 :         int         nblocks_this_io =
     735          140 :             Min(nblocks - nblocks_done, io_combine_limit);
     736              : 
     737          140 :         operation->rel = rel;
     738          140 :         operation->smgr = smgr;
     739          140 :         operation->persistence = rel->rd_rel->relpersistence;
     740          140 :         operation->strategy = NULL;
     741          140 :         operation->forknum = MAIN_FORKNUM;
     742              : 
     743          280 :         io_reqds[nios] = StartReadBuffers(operation,
     744          140 :                                           &buffers[nblocks_done],
     745              :                                           startblock + nblocks_done,
     746              :                                           &nblocks_this_io,
     747              :                                           0);
     748          140 :         nblocks_per_io[nios] = nblocks_this_io;
     749          140 :         nios++;
     750          140 :         nblocks_done += nblocks_this_io;
     751              :     }
     752              : 
     753              :     /*
     754              :      * Now wait for all operations that required IO. This is done at the end,
     755              :      * as otherwise waiting for IO in progress in other backends could
     756              :      * influence the result for subsequent buffers / blocks.
     757              :      */
     758          214 :     for (int nio = 0; nio < nios; nio++)
     759              :     {
     760          140 :         ReadBuffersOperation *operation = &operations[nio];
     761              : 
     762          140 :         if (io_reqds[nio])
     763           75 :             WaitReadBuffers(operation);
     764              :     }
     765              : 
     766              :     /*
     767              :      * Convert what has been done into SQL SRF return value.
     768              :      */
     769          214 :     for (int nio = 0; nio < nios; nio++)
     770              :     {
     771          140 :         ReadBuffersOperation *operation = &operations[nio];
     772          140 :         int         nblocks_this_io = nblocks_per_io[nio];
     773          140 :         Datum       values[6] = {0};
     774          140 :         bool        nulls[6] = {0};
     775              :         ArrayType  *buffers_arr;
     776              : 
     777              :         /* convert buffer array to datum array */
     778          330 :         for (int i = 0; i < nblocks_this_io; i++)
     779              :         {
     780          190 :             Buffer      buf = buffers[nblocks_disp + i];
     781              : 
     782              :             Assert(BufferGetBlockNumber(buf) == startblock + nblocks_disp + i);
     783              : 
     784          190 :             buffers_datum[nblocks_disp + i] = Int32GetDatum(buf);
     785              :         }
     786              : 
     787          140 :         buffers_arr = construct_array_builtin(&buffers_datum[nblocks_disp],
     788              :                                               nblocks_this_io,
     789              :                                               INT4OID);
     790              : 
     791              :         /* blockoff */
     792          140 :         values[0] = Int32GetDatum(nblocks_disp);
     793          140 :         nulls[0] = false;
     794              : 
     795              :         /* blocknum */
     796          140 :         values[1] = UInt32GetDatum(startblock + nblocks_disp);
     797          140 :         nulls[1] = false;
     798              : 
     799              :         /* io_reqd */
     800          140 :         values[2] = BoolGetDatum(io_reqds[nio]);
     801          140 :         nulls[2] = false;
     802              : 
     803              :         /* foreign IO - only valid when IO was required */
     804          140 :         values[3] = BoolGetDatum(io_reqds[nio] ? operation->foreign_io : false);
     805          140 :         nulls[3] = false;
     806              : 
     807              :         /* nblocks */
     808          140 :         values[4] = Int32GetDatum(nblocks_this_io);
     809          140 :         nulls[4] = false;
     810              : 
     811              :         /* array of buffers */
     812          140 :         values[5] = PointerGetDatum(buffers_arr);
     813          140 :         nulls[5] = false;
     814              : 
     815          140 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     816              : 
     817          140 :         nblocks_disp += nblocks_this_io;
     818              :     }
     819              : 
     820              :     /* release pins on all the buffers */
     821          264 :     for (int i = 0; i < nblocks_done; i++)
     822          190 :         ReleaseBuffer(buffers[i]);
     823              : 
     824              :     /*
     825              :      * Free explicitly, to have a chance to detect potential issues with too
     826              :      * long lived references to the operation.
     827              :      */
     828           74 :     pfree(operations);
     829           74 :     pfree(buffers);
     830           74 :     pfree(buffers_datum);
     831           74 :     pfree(io_reqds);
     832           74 :     pfree(nblocks_per_io);
     833              : 
     834           74 :     relation_close(rel, NoLock);
     835              : 
     836           74 :     return (Datum) 0;
     837              : }
     838              : 
     839              : 
     840              : static BlockNumber
     841          104 : read_stream_for_blocks_cb(ReadStream *stream,
     842              :                           void *callback_private_data,
     843              :                           void *per_buffer_data)
     844              : {
     845          104 :     BlocksReadStreamData *stream_data = callback_private_data;
     846              : 
     847          104 :     if (stream_data->curblock >= stream_data->nblocks)
     848           16 :         return InvalidBlockNumber;
     849           88 :     return stream_data->blocks[stream_data->curblock++];
     850              : }
     851              : 
     852            9 : PG_FUNCTION_INFO_V1(read_stream_for_blocks);
     853              : Datum
     854           16 : read_stream_for_blocks(PG_FUNCTION_ARGS)
     855              : {
     856           16 :     Oid         relid = PG_GETARG_OID(0);
     857           16 :     ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
     858           16 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     859              :     Relation    rel;
     860              :     BlocksReadStreamData stream_data;
     861              :     ReadStream *stream;
     862              : 
     863           16 :     InitMaterializedSRF(fcinfo, 0);
     864              : 
     865              :     /*
     866              :      * We expect the input to be an N-element int4 array; verify that. We
     867              :      * don't need to use deconstruct_array() since the array data is just
     868              :      * going to look like a C array of N int4 values.
     869              :      */
     870           16 :     if (ARR_NDIM(blocksarray) != 1 ||
     871           16 :         ARR_HASNULL(blocksarray) ||
     872           16 :         ARR_ELEMTYPE(blocksarray) != INT4OID)
     873            0 :         elog(ERROR, "expected 1 dimensional int4 array");
     874              : 
     875           16 :     stream_data.curblock = 0;
     876           16 :     stream_data.nblocks = ARR_DIMS(blocksarray)[0];
     877           16 :     stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
     878              : 
     879           16 :     rel = relation_open(relid, AccessShareLock);
     880              : 
     881           16 :     stream = read_stream_begin_relation(READ_STREAM_FULL,
     882              :                                         NULL,
     883              :                                         rel,
     884              :                                         MAIN_FORKNUM,
     885              :                                         read_stream_for_blocks_cb,
     886              :                                         &stream_data,
     887              :                                         0);
     888              : 
     889          104 :     for (int i = 0; i < stream_data.nblocks; i++)
     890              :     {
     891           88 :         Buffer      buf = read_stream_next_buffer(stream, NULL);
     892           88 :         Datum       values[3] = {0};
     893           88 :         bool        nulls[3] = {0};
     894              : 
     895           88 :         if (!BufferIsValid(buf))
     896            0 :             elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
     897              : 
     898           88 :         values[0] = Int32GetDatum(i);
     899           88 :         values[1] = UInt32GetDatum(stream_data.blocks[i]);
     900           88 :         values[2] = UInt32GetDatum(buf);
     901              : 
     902           88 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
     903              : 
     904           88 :         ReleaseBuffer(buf);
     905              :     }
     906              : 
     907           16 :     if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
     908            0 :         elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
     909              :              stream_data.nblocks);
     910              : 
     911           16 :     read_stream_end(stream);
     912              : 
     913           16 :     relation_close(rel, NoLock);
     914              : 
     915           16 :     return (Datum) 0;
     916              : }
     917              : 
     918              : 
     919            7 : PG_FUNCTION_INFO_V1(handle_get);
     920              : Datum
     921           12 : handle_get(PG_FUNCTION_ARGS)
     922              : {
     923           12 :     last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
     924              : 
     925           12 :     PG_RETURN_VOID();
     926              : }
     927              : 
     928            7 : PG_FUNCTION_INFO_V1(handle_release_last);
     929              : Datum
     930            4 : handle_release_last(PG_FUNCTION_ARGS)
     931              : {
     932            4 :     if (!last_handle)
     933            0 :         elog(ERROR, "no handle");
     934              : 
     935            4 :     pgaio_io_release(last_handle);
     936              : 
     937            2 :     PG_RETURN_VOID();
     938              : }
     939              : 
     940            7 : PG_FUNCTION_INFO_V1(handle_get_and_error);
     941              : Datum
     942            6 : handle_get_and_error(PG_FUNCTION_ARGS)
     943              : {
     944            6 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     945              : 
     946            6 :     elog(ERROR, "as you command");
     947              :     PG_RETURN_VOID();
     948              : }
     949              : 
     950            7 : PG_FUNCTION_INFO_V1(handle_get_twice);
     951              : Datum
     952            2 : handle_get_twice(PG_FUNCTION_ARGS)
     953              : {
     954            2 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     955            2 :     pgaio_io_acquire(CurrentResourceOwner, NULL);
     956              : 
     957            0 :     PG_RETURN_VOID();
     958              : }
     959              : 
     960            7 : PG_FUNCTION_INFO_V1(handle_get_release);
     961              : Datum
     962            6 : handle_get_release(PG_FUNCTION_ARGS)
     963              : {
     964              :     PgAioHandle *handle;
     965              : 
     966            6 :     handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
     967            6 :     pgaio_io_release(handle);
     968              : 
     969            6 :     PG_RETURN_VOID();
     970              : }
     971              : 
     972            7 : PG_FUNCTION_INFO_V1(batch_start);
     973              : Datum
     974            6 : batch_start(PG_FUNCTION_ARGS)
     975              : {
     976            6 :     pgaio_enter_batchmode();
     977            6 :     PG_RETURN_VOID();
     978              : }
     979              : 
     980            7 : PG_FUNCTION_INFO_V1(batch_end);
     981              : Datum
     982            2 : batch_end(PG_FUNCTION_ARGS)
     983              : {
     984            2 :     pgaio_exit_batchmode();
     985            2 :     PG_RETURN_VOID();
     986              : }
     987              : 
     988              : #ifdef USE_INJECTION_POINTS
     989              : extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
     990              :                                                const void *private_data,
     991              :                                                void *arg);
     992              : extern PGDLLEXPORT void inj_io_reopen(const char *name,
     993              :                                       const void *private_data,
     994              :                                       void *arg);
     995              : 
     996              : static bool
     997         2135 : inj_io_short_read_matches(PgAioHandle *ioh)
     998              : {
     999              :     PGPROC     *io_proc;
    1000              :     int32       io_pid;
    1001              :     int32       inj_pid;
    1002              :     PgAioTargetData *td;
    1003              : 
    1004         2135 :     if (!inj_io_error_state->enabled_short_read)
    1005         1970 :         return false;
    1006              : 
    1007          165 :     if (!inj_io_error_state->short_read_result_set)
    1008            0 :         return false;
    1009              : 
    1010          165 :     io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
    1011          165 :     io_pid = io_proc->pid;
    1012          165 :     inj_pid = inj_io_error_state->short_read_pid;
    1013              : 
    1014          165 :     if (inj_pid != InvalidPid && inj_pid != io_pid)
    1015            8 :         return false;
    1016              : 
    1017          157 :     td = pgaio_io_get_target_data(ioh);
    1018              : 
    1019          157 :     if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
    1020            4 :         td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
    1021            2 :         return false;
    1022              : 
    1023              :     /*
    1024              :      * Only shorten reads that are actually longer than the target size,
    1025              :      * otherwise we can trigger over-reads.
    1026              :      */
    1027          155 :     if (inj_io_error_state->short_read_result >= ioh->result)
    1028           34 :         return false;
    1029              : 
    1030          121 :     return true;
    1031              : }
    1032              : 
    1033              : static bool
    1034         2155 : inj_io_completion_wait_matches(PgAioHandle *ioh)
    1035              : {
    1036              :     PGPROC     *io_proc;
    1037              :     int32       io_pid;
    1038              :     PgAioTargetData *td;
    1039              :     int32       inj_pid;
    1040              :     BlockNumber io_blockno;
    1041              :     BlockNumber inj_blockno;
    1042              :     Oid         inj_relfilenode;
    1043              : 
    1044         2155 :     if (!inj_io_error_state->enabled_completion_wait)
    1045         2018 :         return false;
    1046              : 
    1047          137 :     io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
    1048          137 :     io_pid = io_proc->pid;
    1049          137 :     inj_pid = inj_io_error_state->completion_wait_pid;
    1050              : 
    1051          137 :     if (inj_pid != InvalidPid && inj_pid != io_pid)
    1052          111 :         return false;
    1053              : 
    1054           26 :     td = pgaio_io_get_target_data(ioh);
    1055              : 
    1056           26 :     inj_relfilenode = inj_io_error_state->completion_wait_relfilenode;
    1057           26 :     if (inj_relfilenode != InvalidOid &&
    1058           26 :         td->smgr.rlocator.relNumber != inj_relfilenode)
    1059            6 :         return false;
    1060              : 
    1061           20 :     inj_blockno = inj_io_error_state->completion_wait_blockno;
    1062           20 :     io_blockno = td->smgr.blockNum;
    1063           20 :     if (inj_blockno != InvalidBlockNumber &&
    1064            8 :         !(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
    1065            0 :         return false;
    1066              : 
    1067           20 :     return true;
    1068              : }
    1069              : 
    1070              : static void
    1071         2135 : inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
    1072              : {
    1073         2135 :     PgAioHandle *ioh = (PgAioHandle *) arg;
    1074              : 
    1075         2135 :     if (!inj_io_completion_wait_matches(ioh))
    1076         2125 :         return;
    1077              : 
    1078           10 :     ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
    1079              : 
    1080              :     while (true)
    1081              :     {
    1082           20 :         if (!inj_io_completion_wait_matches(ioh))
    1083           10 :             break;
    1084              : 
    1085           10 :         ConditionVariableSleep(&inj_io_error_state->cv,
    1086           10 :                                inj_io_error_state->completion_wait_event);
    1087              :     }
    1088              : 
    1089           10 :     ConditionVariableCancelSleep();
    1090              : }
    1091              : 
    1092              : static void
    1093         2135 : inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
    1094              : {
    1095         2135 :     PgAioHandle *ioh = (PgAioHandle *) arg;
    1096              : 
    1097         2135 :     ereport(LOG,
    1098              :             errmsg("short read injection point called, is enabled: %d",
    1099              :                    inj_io_error_state->enabled_short_read),
    1100              :             errhidestmt(true), errhidecontext(true));
    1101              : 
    1102         2135 :     if (inj_io_short_read_matches(ioh))
    1103              :     {
    1104          121 :         struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
    1105          121 :         int32       old_result = ioh->result;
    1106          121 :         int32       new_result = inj_io_error_state->short_read_result;
    1107          121 :         int32       processed = 0;
    1108              : 
    1109          121 :         ereport(LOG,
    1110              :                 errmsg("short read inject point, changing result from %d to %d",
    1111              :                        old_result, new_result),
    1112              :                 errhidestmt(true), errhidecontext(true));
    1113              : 
    1114              :         /*
    1115              :          * The underlying IO actually completed OK, and thus the "invalid"
    1116              :          * portion of the IOV actually contains valid data. That can hide a
    1117              :          * lot of problems, e.g. if we were to wrongly mark a buffer, that
    1118              :          * wasn't read according to the shortened-read, IO as valid, the
    1119              :          * contents would look valid and we might miss a bug.
    1120              :          *
    1121              :          * To avoid that, iterate through the IOV and zero out the "failed"
    1122              :          * portion of the IO.
    1123              :          */
    1124          246 :         for (int i = 0; i < ioh->op_data.read.iov_length; i++)
    1125              :         {
    1126          125 :             if (processed + iov[i].iov_len <= new_result)
    1127           12 :                 processed += iov[i].iov_len;
    1128          113 :             else if (processed <= new_result)
    1129              :             {
    1130          113 :                 uint32      ok_part = new_result - processed;
    1131              : 
    1132          113 :                 memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
    1133          113 :                 processed += iov[i].iov_len;
    1134              :             }
    1135              :             else
    1136              :             {
    1137            0 :                 memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
    1138              :             }
    1139              :         }
    1140              : 
    1141          121 :         ioh->result = new_result;
    1142              :     }
    1143         2135 : }
    1144              : 
    1145              : void
    1146         2135 : inj_io_completion_hook(const char *name, const void *private_data, void *arg)
    1147              : {
    1148         2135 :     inj_io_completion_wait_hook(name, private_data, arg);
    1149         2135 :     inj_io_short_read_hook(name, private_data, arg);
    1150         2135 : }
    1151              : 
    1152              : void
    1153          503 : inj_io_reopen(const char *name, const void *private_data, void *arg)
    1154              : {
    1155          503 :     ereport(LOG,
    1156              :             errmsg("reopen injection point called, is enabled: %d",
    1157              :                    inj_io_error_state->enabled_reopen),
    1158              :             errhidestmt(true), errhidecontext(true));
    1159              : 
    1160          503 :     if (inj_io_error_state->enabled_reopen)
    1161            1 :         elog(ERROR, "injection point triggering failure to reopen ");
    1162          502 : }
    1163              : #endif
    1164              : 
    1165            9 : PG_FUNCTION_INFO_V1(inj_io_completion_wait);
    1166              : Datum
    1167           10 : inj_io_completion_wait(PG_FUNCTION_ARGS)
    1168              : {
    1169              : #ifdef USE_INJECTION_POINTS
    1170           10 :     inj_io_error_state->enabled_completion_wait = true;
    1171           10 :     inj_io_error_state->completion_wait_pid =
    1172           10 :         PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0);
    1173           10 :     inj_io_error_state->completion_wait_relfilenode =
    1174           10 :         PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
    1175           10 :     inj_io_error_state->completion_wait_blockno =
    1176           10 :         PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2);
    1177              : #else
    1178              :     elog(ERROR, "injection points not supported");
    1179              : #endif
    1180              : 
    1181           10 :     PG_RETURN_VOID();
    1182              : }
    1183              : 
    1184           13 : PG_FUNCTION_INFO_V1(inj_io_completion_continue);
    1185              : Datum
    1186           10 : inj_io_completion_continue(PG_FUNCTION_ARGS)
    1187              : {
    1188              : #ifdef USE_INJECTION_POINTS
    1189           10 :     inj_io_error_state->enabled_completion_wait = false;
    1190           10 :     inj_io_error_state->completion_wait_pid = InvalidPid;
    1191           10 :     inj_io_error_state->completion_wait_relfilenode = InvalidOid;
    1192           10 :     inj_io_error_state->completion_wait_blockno = InvalidBlockNumber;
    1193           10 :     ConditionVariableBroadcast(&inj_io_error_state->cv);
    1194              : #else
    1195              :     elog(ERROR, "injection points not supported");
    1196              : #endif
    1197              : 
    1198           10 :     PG_RETURN_VOID();
    1199              : }
    1200              : 
    1201            9 : PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
    1202              : Datum
    1203           16 : inj_io_short_read_attach(PG_FUNCTION_ARGS)
    1204              : {
    1205              : #ifdef USE_INJECTION_POINTS
    1206           16 :     inj_io_error_state->enabled_short_read = true;
    1207           16 :     inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
    1208           16 :     if (inj_io_error_state->short_read_result_set)
    1209           16 :         inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
    1210           16 :     inj_io_error_state->short_read_pid =
    1211           16 :         PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1);
    1212           16 :     inj_io_error_state->short_read_relfilenode =
    1213           16 :         PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2);
    1214              : #else
    1215              :     elog(ERROR, "injection points not supported");
    1216              : #endif
    1217              : 
    1218           16 :     PG_RETURN_VOID();
    1219              : }
    1220              : 
    1221            9 : PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
    1222              : Datum
    1223            6 : inj_io_short_read_detach(PG_FUNCTION_ARGS)
    1224              : {
    1225              : #ifdef USE_INJECTION_POINTS
    1226            6 :     inj_io_error_state->enabled_short_read = false;
    1227              : #else
    1228              :     elog(ERROR, "injection points not supported");
    1229              : #endif
    1230            6 :     PG_RETURN_VOID();
    1231              : }
    1232              : 
    1233            6 : PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
    1234              : Datum
    1235            1 : inj_io_reopen_attach(PG_FUNCTION_ARGS)
    1236              : {
    1237              : #ifdef USE_INJECTION_POINTS
    1238            1 :     inj_io_error_state->enabled_reopen = true;
    1239              : #else
    1240              :     elog(ERROR, "injection points not supported");
    1241              : #endif
    1242              : 
    1243            1 :     PG_RETURN_VOID();
    1244              : }
    1245              : 
    1246            6 : PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
    1247              : Datum
    1248            1 : inj_io_reopen_detach(PG_FUNCTION_ARGS)
    1249              : {
    1250              : #ifdef USE_INJECTION_POINTS
    1251            1 :     inj_io_error_state->enabled_reopen = false;
    1252              : #else
    1253              :     elog(ERROR, "injection points not supported");
    1254              : #endif
    1255            1 :     PG_RETURN_VOID();
    1256              : }
        

Generated by: LCOV version 2.0-1