Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_aio.c
4 : * Helpers to write tests for AIO
5 : *
6 : * This module provides interface functions for C functionality to SQL, to
7 : * make it possible to test AIO related behavior in a targeted way from SQL.
8 : * It'd not generally be safe to export these functions to SQL, but for a test
9 : * that's fine.
10 : *
11 : * Copyright (c) 2020-2025, PostgreSQL Global Development Group
12 : *
13 : * IDENTIFICATION
14 : * src/test/modules/test_aio/test_aio.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "access/relation.h"
22 : #include "fmgr.h"
23 : #include "storage/aio.h"
24 : #include "storage/aio_internal.h"
25 : #include "storage/buf_internals.h"
26 : #include "storage/bufmgr.h"
27 : #include "storage/checksum.h"
28 : #include "storage/ipc.h"
29 : #include "storage/lwlock.h"
30 : #include "utils/builtins.h"
31 : #include "utils/injection_point.h"
32 : #include "utils/rel.h"
33 :
34 :
35 4 : PG_MODULE_MAGIC;
36 :
37 :
38 : typedef struct InjIoErrorState
39 : {
40 : bool enabled_short_read;
41 : bool enabled_reopen;
42 :
43 : bool short_read_result_set;
44 : int short_read_result;
45 : } InjIoErrorState;
46 :
47 : static InjIoErrorState * inj_io_error_state;
48 :
49 : /* Shared memory init callbacks */
50 : static shmem_request_hook_type prev_shmem_request_hook = NULL;
51 : static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
52 :
53 :
54 : static PgAioHandle *last_handle;
55 :
56 :
57 :
58 : static void
59 4 : test_aio_shmem_request(void)
60 : {
61 4 : if (prev_shmem_request_hook)
62 0 : prev_shmem_request_hook();
63 :
64 4 : RequestAddinShmemSpace(sizeof(InjIoErrorState));
65 4 : }
66 :
67 : static void
68 4 : test_aio_shmem_startup(void)
69 : {
70 : bool found;
71 :
72 4 : if (prev_shmem_startup_hook)
73 0 : prev_shmem_startup_hook();
74 :
75 : /* Create or attach to the shared memory state */
76 4 : LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
77 :
78 4 : inj_io_error_state = ShmemInitStruct("injection_points",
79 : sizeof(InjIoErrorState),
80 : &found);
81 :
82 4 : if (!found)
83 : {
84 : /* First time through, initialize */
85 4 : inj_io_error_state->enabled_short_read = false;
86 4 : inj_io_error_state->enabled_reopen = false;
87 :
88 : #ifdef USE_INJECTION_POINTS
89 4 : InjectionPointAttach("aio-process-completion-before-shared",
90 : "test_aio",
91 : "inj_io_short_read",
92 : NULL,
93 : 0);
94 4 : InjectionPointLoad("aio-process-completion-before-shared");
95 :
96 4 : InjectionPointAttach("aio-worker-after-reopen",
97 : "test_aio",
98 : "inj_io_reopen",
99 : NULL,
100 : 0);
101 4 : InjectionPointLoad("aio-worker-after-reopen");
102 :
103 : #endif
104 : }
105 : else
106 : {
107 : /*
108 : * Pre-load the injection points now, so we can call them in a
109 : * critical section.
110 : */
111 : #ifdef USE_INJECTION_POINTS
112 0 : InjectionPointLoad("aio-process-completion-before-shared");
113 0 : InjectionPointLoad("aio-worker-after-reopen");
114 0 : elog(LOG, "injection point loaded");
115 : #endif
116 : }
117 :
118 4 : LWLockRelease(AddinShmemInitLock);
119 4 : }
120 :
121 : void
122 4 : _PG_init(void)
123 : {
124 4 : if (!process_shared_preload_libraries_in_progress)
125 0 : return;
126 :
127 4 : prev_shmem_request_hook = shmem_request_hook;
128 4 : shmem_request_hook = test_aio_shmem_request;
129 4 : prev_shmem_startup_hook = shmem_startup_hook;
130 4 : shmem_startup_hook = test_aio_shmem_startup;
131 : }
132 :
133 :
134 12 : PG_FUNCTION_INFO_V1(errno_from_string);
135 : Datum
136 8 : errno_from_string(PG_FUNCTION_ARGS)
137 : {
138 8 : const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
139 :
140 8 : if (strcmp(sym, "EIO") == 0)
141 4 : PG_RETURN_INT32(EIO);
142 4 : else if (strcmp(sym, "EAGAIN") == 0)
143 0 : PG_RETURN_INT32(EAGAIN);
144 4 : else if (strcmp(sym, "EINTR") == 0)
145 0 : PG_RETURN_INT32(EINTR);
146 4 : else if (strcmp(sym, "ENOSPC") == 0)
147 0 : PG_RETURN_INT32(ENOSPC);
148 4 : else if (strcmp(sym, "EROFS") == 0)
149 4 : PG_RETURN_INT32(EROFS);
150 :
151 0 : ereport(ERROR,
152 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
153 : errmsg_internal("%s is not a supported errno value", sym));
154 : PG_RETURN_INT32(0);
155 : }
156 :
157 16 : PG_FUNCTION_INFO_V1(grow_rel);
158 : Datum
159 12 : grow_rel(PG_FUNCTION_ARGS)
160 : {
161 12 : Oid relid = PG_GETARG_OID(0);
162 12 : uint32 nblocks = PG_GETARG_UINT32(1);
163 : Relation rel;
164 : #define MAX_BUFFERS_TO_EXTEND_BY 64
165 : Buffer victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
166 :
167 12 : rel = relation_open(relid, AccessExclusiveLock);
168 :
169 24 : while (nblocks > 0)
170 : {
171 : uint32 extend_by_pages;
172 :
173 12 : extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
174 :
175 12 : ExtendBufferedRelBy(BMR_REL(rel),
176 : MAIN_FORKNUM,
177 : NULL,
178 : 0,
179 : extend_by_pages,
180 : victim_buffers,
181 : &extend_by_pages);
182 :
183 12 : nblocks -= extend_by_pages;
184 :
185 156 : for (uint32 i = 0; i < extend_by_pages; i++)
186 : {
187 144 : ReleaseBuffer(victim_buffers[i]);
188 : }
189 : }
190 :
191 12 : relation_close(rel, NoLock);
192 :
193 12 : PG_RETURN_VOID();
194 : }
195 :
196 36 : PG_FUNCTION_INFO_V1(modify_rel_block);
197 : Datum
198 140 : modify_rel_block(PG_FUNCTION_ARGS)
199 : {
200 140 : Oid relid = PG_GETARG_OID(0);
201 140 : BlockNumber blkno = PG_GETARG_UINT32(1);
202 140 : bool zero = PG_GETARG_BOOL(2);
203 140 : bool corrupt_header = PG_GETARG_BOOL(3);
204 140 : bool corrupt_checksum = PG_GETARG_BOOL(4);
205 140 : Page page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
206 : bool flushed;
207 : Relation rel;
208 : Buffer buf;
209 : PageHeader ph;
210 :
211 140 : rel = relation_open(relid, AccessExclusiveLock);
212 :
213 140 : buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
214 : RBM_ZERO_ON_ERROR, NULL);
215 :
216 140 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
217 :
218 : /*
219 : * copy the page to local memory, seems nicer than to directly modify in
220 : * the buffer pool.
221 : */
222 140 : memcpy(page, BufferGetPage(buf), BLCKSZ);
223 :
224 140 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
225 :
226 140 : ReleaseBuffer(buf);
227 :
228 : /*
229 : * Don't want to have a buffer in-memory that's marked valid where the
230 : * on-disk contents are invalid. Particularly not if the in-memory buffer
231 : * could be dirty...
232 : *
233 : * While we hold an AEL on the relation nobody else should be able to read
234 : * the buffer in.
235 : *
236 : * NB: This is probably racy, better don't copy this to non-test code.
237 : */
238 140 : if (BufferIsLocal(buf))
239 36 : InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
240 : else
241 104 : EvictUnpinnedBuffer(buf, &flushed);
242 :
243 : /*
244 : * Now modify the page as asked for by the caller.
245 : */
246 140 : if (zero)
247 32 : memset(page, 0, BufferGetPageSize(buf));
248 :
249 140 : if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
250 32 : PageInit(page, BufferGetPageSize(buf), 0);
251 :
252 140 : ph = (PageHeader) page;
253 :
254 140 : if (corrupt_header)
255 64 : ph->pd_special = BLCKSZ + 1;
256 :
257 140 : if (corrupt_checksum)
258 : {
259 52 : bool successfully_corrupted = 0;
260 :
261 : /*
262 : * Any single modification of the checksum could just end up being
263 : * valid again, due to e.g. corrupt_header changing the data in a way
264 : * that'd result in the "corrupted" checksum, or the checksum already
265 : * being invalid. Retry in that, unlikely, case.
266 : */
267 52 : for (int i = 0; i < 100; i++)
268 : {
269 : uint16 verify_checksum;
270 : uint16 old_checksum;
271 :
272 52 : old_checksum = ph->pd_checksum;
273 52 : ph->pd_checksum = old_checksum + 1;
274 :
275 52 : elog(LOG, "corrupting checksum of blk %u from %u to %u",
276 : blkno, old_checksum, ph->pd_checksum);
277 :
278 52 : verify_checksum = pg_checksum_page(page, blkno);
279 52 : if (verify_checksum != ph->pd_checksum)
280 : {
281 52 : successfully_corrupted = true;
282 52 : break;
283 : }
284 : }
285 :
286 52 : if (!successfully_corrupted)
287 0 : elog(ERROR, "could not corrupt checksum, what's going on?");
288 : }
289 : else
290 : {
291 88 : PageSetChecksumInplace(page, blkno);
292 : }
293 :
294 140 : smgrwrite(RelationGetSmgr(rel),
295 : MAIN_FORKNUM, blkno, page, true);
296 :
297 140 : relation_close(rel, NoLock);
298 :
299 140 : PG_RETURN_VOID();
300 : }
301 :
302 : /*
303 : * Ensures a buffer for rel & blkno is in shared buffers, without actually
304 : * caring about the buffer contents. Used to set up test scenarios.
305 : */
306 : static Buffer
307 260 : create_toy_buffer(Relation rel, BlockNumber blkno)
308 : {
309 : Buffer buf;
310 : BufferDesc *buf_hdr;
311 : uint32 buf_state;
312 260 : bool was_pinned = false;
313 :
314 : /* place buffer in shared buffers without erroring out */
315 260 : buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
316 260 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
317 :
318 260 : if (RelationUsesLocalBuffers(rel))
319 : {
320 88 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
321 88 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
322 : }
323 : else
324 : {
325 172 : buf_hdr = GetBufferDescriptor(buf - 1);
326 172 : buf_state = LockBufHdr(buf_hdr);
327 : }
328 :
329 : /*
330 : * We should be the only backend accessing this buffer. This is just a
331 : * small bit of belt-and-suspenders defense, none of this code should ever
332 : * run in a cluster with real data.
333 : */
334 260 : if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
335 0 : was_pinned = true;
336 : else
337 260 : buf_state &= ~(BM_VALID | BM_DIRTY);
338 :
339 260 : if (RelationUsesLocalBuffers(rel))
340 88 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
341 : else
342 172 : UnlockBufHdr(buf_hdr, buf_state);
343 :
344 260 : if (was_pinned)
345 0 : elog(ERROR, "toy buffer %d was already pinned",
346 : buf);
347 :
348 260 : return buf;
349 : }
350 :
351 : /*
352 : * A "low level" read. This does similar things to what
353 : * StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
354 : * less sanity).
355 : */
356 40 : PG_FUNCTION_INFO_V1(read_rel_block_ll);
357 : Datum
358 144 : read_rel_block_ll(PG_FUNCTION_ARGS)
359 : {
360 144 : Oid relid = PG_GETARG_OID(0);
361 144 : BlockNumber blkno = PG_GETARG_UINT32(1);
362 144 : int nblocks = PG_GETARG_INT32(2);
363 144 : bool wait_complete = PG_GETARG_BOOL(3);
364 144 : bool batchmode_enter = PG_GETARG_BOOL(4);
365 144 : bool call_smgrreleaseall = PG_GETARG_BOOL(5);
366 144 : bool batchmode_exit = PG_GETARG_BOOL(6);
367 144 : bool zero_on_error = PG_GETARG_BOOL(7);
368 : Relation rel;
369 : Buffer bufs[PG_IOV_MAX];
370 : BufferDesc *buf_hdrs[PG_IOV_MAX];
371 : Page pages[PG_IOV_MAX];
372 144 : uint8 srb_flags = 0;
373 : PgAioReturn ior;
374 : PgAioHandle *ioh;
375 : PgAioWaitRef iow;
376 : SMgrRelation smgr;
377 :
378 144 : if (nblocks <= 0 || nblocks > PG_IOV_MAX)
379 0 : elog(ERROR, "nblocks is out of range");
380 :
381 144 : rel = relation_open(relid, AccessExclusiveLock);
382 :
383 392 : for (int i = 0; i < nblocks; i++)
384 : {
385 248 : bufs[i] = create_toy_buffer(rel, blkno + i);
386 248 : pages[i] = BufferGetBlock(bufs[i]);
387 248 : buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
388 248 : GetLocalBufferDescriptor(-bufs[i] - 1) :
389 164 : GetBufferDescriptor(bufs[i] - 1);
390 : }
391 :
392 144 : smgr = RelationGetSmgr(rel);
393 :
394 144 : pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
395 :
396 144 : ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
397 144 : pgaio_io_get_wref(ioh, &iow);
398 :
399 144 : if (RelationUsesLocalBuffers(rel))
400 : {
401 128 : for (int i = 0; i < nblocks; i++)
402 84 : StartLocalBufferIO(buf_hdrs[i], true, false);
403 44 : pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
404 : }
405 : else
406 : {
407 264 : for (int i = 0; i < nblocks; i++)
408 164 : StartBufferIO(buf_hdrs[i], true, false);
409 : }
410 :
411 144 : pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
412 :
413 144 : if (zero_on_error | zero_damaged_pages)
414 44 : srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
415 144 : if (ignore_checksum_failure)
416 20 : srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
417 :
418 144 : pgaio_io_register_callbacks(ioh,
419 144 : RelationUsesLocalBuffers(rel) ?
420 : PGAIO_HCB_LOCAL_BUFFER_READV :
421 : PGAIO_HCB_SHARED_BUFFER_READV,
422 : srb_flags);
423 :
424 144 : if (batchmode_enter)
425 8 : pgaio_enter_batchmode();
426 :
427 144 : smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
428 : (void *) pages, nblocks);
429 :
430 144 : if (call_smgrreleaseall)
431 8 : smgrreleaseall();
432 :
433 144 : if (batchmode_exit)
434 8 : pgaio_exit_batchmode();
435 :
436 392 : for (int i = 0; i < nblocks; i++)
437 248 : ReleaseBuffer(bufs[i]);
438 :
439 144 : if (wait_complete)
440 : {
441 100 : pgaio_wref_wait(&iow);
442 :
443 100 : if (ior.result.status != PGAIO_RS_OK)
444 88 : pgaio_result_report(ior.result,
445 : &ior.target_data,
446 88 : ior.result.status == PGAIO_RS_ERROR ?
447 : ERROR : WARNING);
448 : }
449 :
450 100 : relation_close(rel, NoLock);
451 :
452 100 : PG_RETURN_VOID();
453 : }
454 :
455 22 : PG_FUNCTION_INFO_V1(invalidate_rel_block);
456 : Datum
457 278 : invalidate_rel_block(PG_FUNCTION_ARGS)
458 : {
459 278 : Oid relid = PG_GETARG_OID(0);
460 278 : BlockNumber blkno = PG_GETARG_UINT32(1);
461 : Relation rel;
462 : PrefetchBufferResult pr;
463 : Buffer buf;
464 :
465 278 : rel = relation_open(relid, AccessExclusiveLock);
466 :
467 : /*
468 : * This is a gross hack, but there's no other API exposed that allows to
469 : * get a buffer ID without actually reading the block in.
470 : */
471 278 : pr = PrefetchBuffer(rel, MAIN_FORKNUM, blkno);
472 278 : buf = pr.recent_buffer;
473 :
474 278 : if (BufferIsValid(buf))
475 : {
476 : /* if the buffer contents aren't valid, this'll return false */
477 250 : if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
478 : {
479 238 : BufferDesc *buf_hdr = BufferIsLocal(buf) ?
480 64 : GetLocalBufferDescriptor(-buf - 1)
481 238 : : GetBufferDescriptor(buf - 1);
482 : bool flushed;
483 :
484 238 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
485 :
486 238 : if (pg_atomic_read_u32(&buf_hdr->state) & BM_DIRTY)
487 : {
488 144 : if (BufferIsLocal(buf))
489 44 : FlushLocalBuffer(buf_hdr, NULL);
490 : else
491 100 : FlushOneBuffer(buf);
492 : }
493 238 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
494 238 : ReleaseBuffer(buf);
495 :
496 238 : if (BufferIsLocal(buf))
497 64 : InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
498 174 : else if (!EvictUnpinnedBuffer(buf, &flushed))
499 0 : elog(ERROR, "couldn't evict");
500 : }
501 : }
502 :
503 278 : relation_close(rel, AccessExclusiveLock);
504 :
505 278 : PG_RETURN_VOID();
506 : }
507 :
508 12 : PG_FUNCTION_INFO_V1(buffer_create_toy);
509 : Datum
510 12 : buffer_create_toy(PG_FUNCTION_ARGS)
511 : {
512 12 : Oid relid = PG_GETARG_OID(0);
513 12 : BlockNumber blkno = PG_GETARG_UINT32(1);
514 : Relation rel;
515 : Buffer buf;
516 :
517 12 : rel = relation_open(relid, AccessExclusiveLock);
518 :
519 12 : buf = create_toy_buffer(rel, blkno);
520 12 : ReleaseBuffer(buf);
521 :
522 12 : relation_close(rel, NoLock);
523 :
524 12 : PG_RETURN_INT32(buf);
525 : }
526 :
527 16 : PG_FUNCTION_INFO_V1(buffer_call_start_io);
528 : Datum
529 40 : buffer_call_start_io(PG_FUNCTION_ARGS)
530 : {
531 40 : Buffer buf = PG_GETARG_INT32(0);
532 40 : bool for_input = PG_GETARG_BOOL(1);
533 40 : bool nowait = PG_GETARG_BOOL(2);
534 : bool can_start;
535 :
536 40 : if (BufferIsLocal(buf))
537 16 : can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
538 : for_input, nowait);
539 : else
540 24 : can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
541 : for_input, nowait);
542 :
543 : /*
544 : * For tests we don't want the resowner release preventing us from
545 : * orchestrating odd scenarios.
546 : */
547 40 : if (can_start && !BufferIsLocal(buf))
548 12 : ResourceOwnerForgetBufferIO(CurrentResourceOwner,
549 : buf);
550 :
551 40 : ereport(LOG,
552 : errmsg("buffer %d after StartBufferIO: %s",
553 : buf, DebugPrintBufferRefcount(buf)),
554 : errhidestmt(true), errhidecontext(true));
555 :
556 40 : PG_RETURN_BOOL(can_start);
557 : }
558 :
559 16 : PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
560 : Datum
561 20 : buffer_call_terminate_io(PG_FUNCTION_ARGS)
562 : {
563 20 : Buffer buf = PG_GETARG_INT32(0);
564 20 : bool for_input = PG_GETARG_BOOL(1);
565 20 : bool succeed = PG_GETARG_BOOL(2);
566 20 : bool io_error = PG_GETARG_BOOL(3);
567 20 : bool release_aio = PG_GETARG_BOOL(4);
568 20 : bool clear_dirty = false;
569 20 : uint32 set_flag_bits = 0;
570 :
571 20 : if (io_error)
572 0 : set_flag_bits |= BM_IO_ERROR;
573 :
574 20 : if (for_input)
575 : {
576 20 : clear_dirty = false;
577 :
578 20 : if (succeed)
579 8 : set_flag_bits |= BM_VALID;
580 : }
581 : else
582 : {
583 0 : if (succeed)
584 0 : clear_dirty = true;
585 : }
586 :
587 20 : ereport(LOG,
588 : errmsg("buffer %d before Terminate[Local]BufferIO: %s",
589 : buf, DebugPrintBufferRefcount(buf)),
590 : errhidestmt(true), errhidecontext(true));
591 :
592 20 : if (BufferIsLocal(buf))
593 8 : TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
594 : clear_dirty, set_flag_bits, release_aio);
595 : else
596 12 : TerminateBufferIO(GetBufferDescriptor(buf - 1),
597 : clear_dirty, set_flag_bits, false, release_aio);
598 :
599 20 : ereport(LOG,
600 : errmsg("buffer %d after Terminate[Local]BufferIO: %s",
601 : buf, DebugPrintBufferRefcount(buf)),
602 : errhidestmt(true), errhidecontext(true));
603 :
604 20 : PG_RETURN_VOID();
605 : }
606 :
607 12 : PG_FUNCTION_INFO_V1(handle_get);
608 : Datum
609 24 : handle_get(PG_FUNCTION_ARGS)
610 : {
611 24 : last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
612 :
613 24 : PG_RETURN_VOID();
614 : }
615 :
616 12 : PG_FUNCTION_INFO_V1(handle_release_last);
617 : Datum
618 8 : handle_release_last(PG_FUNCTION_ARGS)
619 : {
620 8 : if (!last_handle)
621 0 : elog(ERROR, "no handle");
622 :
623 8 : pgaio_io_release(last_handle);
624 :
625 4 : PG_RETURN_VOID();
626 : }
627 :
628 12 : PG_FUNCTION_INFO_V1(handle_get_and_error);
629 : Datum
630 12 : handle_get_and_error(PG_FUNCTION_ARGS)
631 : {
632 12 : pgaio_io_acquire(CurrentResourceOwner, NULL);
633 :
634 12 : elog(ERROR, "as you command");
635 : PG_RETURN_VOID();
636 : }
637 :
638 12 : PG_FUNCTION_INFO_V1(handle_get_twice);
639 : Datum
640 4 : handle_get_twice(PG_FUNCTION_ARGS)
641 : {
642 4 : pgaio_io_acquire(CurrentResourceOwner, NULL);
643 4 : pgaio_io_acquire(CurrentResourceOwner, NULL);
644 :
645 0 : PG_RETURN_VOID();
646 : }
647 :
648 12 : PG_FUNCTION_INFO_V1(handle_get_release);
649 : Datum
650 12 : handle_get_release(PG_FUNCTION_ARGS)
651 : {
652 : PgAioHandle *handle;
653 :
654 12 : handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
655 12 : pgaio_io_release(handle);
656 :
657 12 : PG_RETURN_VOID();
658 : }
659 :
660 12 : PG_FUNCTION_INFO_V1(batch_start);
661 : Datum
662 12 : batch_start(PG_FUNCTION_ARGS)
663 : {
664 12 : pgaio_enter_batchmode();
665 12 : PG_RETURN_VOID();
666 : }
667 :
668 12 : PG_FUNCTION_INFO_V1(batch_end);
669 : Datum
670 4 : batch_end(PG_FUNCTION_ARGS)
671 : {
672 4 : pgaio_exit_batchmode();
673 4 : PG_RETURN_VOID();
674 : }
675 :
676 : #ifdef USE_INJECTION_POINTS
677 : extern PGDLLEXPORT void inj_io_short_read(const char *name, const void *private_data);
678 : extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data);
679 :
680 : void
681 2574 : inj_io_short_read(const char *name, const void *private_data)
682 : {
683 : PgAioHandle *ioh;
684 :
685 2574 : ereport(LOG,
686 : errmsg("short read injection point called, is enabled: %d",
687 : inj_io_error_state->enabled_reopen),
688 : errhidestmt(true), errhidecontext(true));
689 :
690 2574 : if (inj_io_error_state->enabled_short_read)
691 : {
692 96 : ioh = pgaio_inj_io_get();
693 :
694 : /*
695 : * Only shorten reads that are actually longer than the target size,
696 : * otherwise we can trigger over-reads.
697 : */
698 96 : if (inj_io_error_state->short_read_result_set
699 96 : && ioh->op == PGAIO_OP_READV
700 96 : && inj_io_error_state->short_read_result <= ioh->result)
701 : {
702 88 : struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
703 88 : int32 old_result = ioh->result;
704 88 : int32 new_result = inj_io_error_state->short_read_result;
705 88 : int32 processed = 0;
706 :
707 88 : ereport(LOG,
708 : errmsg("short read inject point, changing result from %d to %d",
709 : old_result, new_result),
710 : errhidestmt(true), errhidecontext(true));
711 :
712 : /*
713 : * The underlying IO actually completed OK, and thus the "invalid"
714 : * portion of the IOV actually contains valid data. That can hide
715 : * a lot of problems, e.g. if we were to wrongly mark a buffer,
716 : * that wasn't read according to the shortened-read, IO as valid,
717 : * the contents would look valid and we might miss a bug.
718 : *
719 : * To avoid that, iterate through the IOV and zero out the
720 : * "failed" portion of the IO.
721 : */
722 184 : for (int i = 0; i < ioh->op_data.read.iov_length; i++)
723 : {
724 96 : if (processed + iov[i].iov_len <= new_result)
725 68 : processed += iov[i].iov_len;
726 28 : else if (processed <= new_result)
727 : {
728 28 : uint32 ok_part = new_result - processed;
729 :
730 28 : memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
731 28 : processed += iov[i].iov_len;
732 : }
733 : else
734 : {
735 0 : memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
736 : }
737 : }
738 :
739 88 : ioh->result = new_result;
740 : }
741 : }
742 2574 : }
743 :
744 : void
745 692 : inj_io_reopen(const char *name, const void *private_data)
746 : {
747 692 : ereport(LOG,
748 : errmsg("reopen injection point called, is enabled: %d",
749 : inj_io_error_state->enabled_reopen),
750 : errhidestmt(true), errhidecontext(true));
751 :
752 692 : if (inj_io_error_state->enabled_reopen)
753 2 : elog(ERROR, "injection point triggering failure to reopen ");
754 690 : }
755 : #endif
756 :
757 12 : PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
758 : Datum
759 28 : inj_io_short_read_attach(PG_FUNCTION_ARGS)
760 : {
761 : #ifdef USE_INJECTION_POINTS
762 28 : inj_io_error_state->enabled_short_read = true;
763 28 : inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
764 28 : if (inj_io_error_state->short_read_result_set)
765 28 : inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
766 : #else
767 : elog(ERROR, "injection points not supported");
768 : #endif
769 :
770 28 : PG_RETURN_VOID();
771 : }
772 :
773 12 : PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
774 : Datum
775 8 : inj_io_short_read_detach(PG_FUNCTION_ARGS)
776 : {
777 : #ifdef USE_INJECTION_POINTS
778 8 : inj_io_error_state->enabled_short_read = false;
779 : #else
780 : elog(ERROR, "injection points not supported");
781 : #endif
782 8 : PG_RETURN_VOID();
783 : }
784 :
785 10 : PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
786 : Datum
787 2 : inj_io_reopen_attach(PG_FUNCTION_ARGS)
788 : {
789 : #ifdef USE_INJECTION_POINTS
790 2 : inj_io_error_state->enabled_reopen = true;
791 : #else
792 : elog(ERROR, "injection points not supported");
793 : #endif
794 :
795 2 : PG_RETURN_VOID();
796 : }
797 :
798 10 : PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
799 : Datum
800 2 : inj_io_reopen_detach(PG_FUNCTION_ARGS)
801 : {
802 : #ifdef USE_INJECTION_POINTS
803 2 : inj_io_error_state->enabled_reopen = false;
804 : #else
805 : elog(ERROR, "injection points not supported");
806 : #endif
807 2 : PG_RETURN_VOID();
808 : }
|