Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_aio.c
4 : * Helpers to write tests for AIO
5 : *
6 : * This module provides interface functions for C functionality to SQL, to
7 : * make it possible to test AIO related behavior in a targeted way from SQL.
8 : * It'd not generally be safe to export these functions to SQL, but for a test
9 : * that's fine.
10 : *
11 : * Copyright (c) 2020-2026, PostgreSQL Global Development Group
12 : *
13 : * IDENTIFICATION
14 : * src/test/modules/test_aio/test_aio.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "access/relation.h"
22 : #include "catalog/pg_type.h"
23 : #include "fmgr.h"
24 : #include "funcapi.h"
25 : #include "storage/aio.h"
26 : #include "storage/aio_internal.h"
27 : #include "storage/buf_internals.h"
28 : #include "storage/bufmgr.h"
29 : #include "storage/checksum.h"
30 : #include "storage/condition_variable.h"
31 : #include "storage/lwlock.h"
32 : #include "storage/proc.h"
33 : #include "storage/procnumber.h"
34 : #include "storage/read_stream.h"
35 : #include "utils/array.h"
36 : #include "utils/builtins.h"
37 : #include "utils/injection_point.h"
38 : #include "utils/rel.h"
39 : #include "utils/tuplestore.h"
40 : #include "utils/wait_event.h"
41 :
42 :
43 7 : PG_MODULE_MAGIC;
44 :
45 :
46 : /* In shared memory */
47 : typedef struct InjIoErrorState
48 : {
49 : ConditionVariable cv;
50 :
51 : bool enabled_short_read;
52 : bool enabled_reopen;
53 :
54 : bool enabled_completion_wait;
55 : Oid completion_wait_relfilenode;
56 : BlockNumber completion_wait_blockno;
57 : pid_t completion_wait_pid;
58 : uint32 completion_wait_event;
59 :
60 : bool short_read_result_set;
61 : Oid short_read_relfilenode;
62 : pid_t short_read_pid;
63 : int short_read_result;
64 : } InjIoErrorState;
65 :
66 : typedef struct BlocksReadStreamData
67 : {
68 : int nblocks;
69 : int curblock;
70 : uint32 *blocks;
71 : } BlocksReadStreamData;
72 :
73 :
74 : static InjIoErrorState *inj_io_error_state;
75 :
76 : /* Shared memory init callbacks */
77 : static void test_aio_shmem_request(void *arg);
78 : static void test_aio_shmem_init(void *arg);
79 : static void test_aio_shmem_attach(void *arg);
80 :
81 : static const ShmemCallbacks inj_io_shmem_callbacks = {
82 : .request_fn = test_aio_shmem_request,
83 : .init_fn = test_aio_shmem_init,
84 : .attach_fn = test_aio_shmem_attach,
85 : };
86 :
87 :
88 : static PgAioHandle *last_handle;
89 :
90 :
91 :
92 : static void
93 7 : test_aio_shmem_request(void *arg)
94 : {
95 7 : ShmemRequestStruct(.name = "test_aio injection points",
96 : .size = sizeof(InjIoErrorState),
97 : .ptr = (void **) &inj_io_error_state,
98 : );
99 7 : }
100 :
101 : static void
102 7 : test_aio_shmem_init(void *arg)
103 : {
104 : /* First time through, initialize */
105 7 : inj_io_error_state->enabled_short_read = false;
106 7 : inj_io_error_state->enabled_reopen = false;
107 7 : inj_io_error_state->enabled_completion_wait = false;
108 :
109 7 : ConditionVariableInit(&inj_io_error_state->cv);
110 7 : inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
111 :
112 : #ifdef USE_INJECTION_POINTS
113 7 : InjectionPointAttach("aio-process-completion-before-shared",
114 : "test_aio",
115 : "inj_io_completion_hook",
116 : NULL,
117 : 0);
118 7 : InjectionPointLoad("aio-process-completion-before-shared");
119 :
120 7 : InjectionPointAttach("aio-worker-after-reopen",
121 : "test_aio",
122 : "inj_io_reopen",
123 : NULL,
124 : 0);
125 7 : InjectionPointLoad("aio-worker-after-reopen");
126 :
127 : #endif
128 7 : }
129 :
130 : static void
131 0 : test_aio_shmem_attach(void *arg)
132 : {
133 : /*
134 : * Pre-load the injection points now, so we can call them in a critical
135 : * section.
136 : */
137 : #ifdef USE_INJECTION_POINTS
138 0 : InjectionPointLoad("aio-process-completion-before-shared");
139 0 : InjectionPointLoad("aio-worker-after-reopen");
140 0 : elog(LOG, "injection point loaded");
141 : #endif
142 0 : }
143 :
144 : void
145 7 : _PG_init(void)
146 : {
147 7 : if (!process_shared_preload_libraries_in_progress)
148 0 : return;
149 :
150 7 : RegisterShmemCallbacks(&inj_io_shmem_callbacks);
151 : }
152 :
153 :
154 9 : PG_FUNCTION_INFO_V1(errno_from_string);
155 : Datum
156 6 : errno_from_string(PG_FUNCTION_ARGS)
157 : {
158 6 : const char *sym = text_to_cstring(PG_GETARG_TEXT_PP(0));
159 :
160 6 : if (strcmp(sym, "EIO") == 0)
161 4 : PG_RETURN_INT32(EIO);
162 2 : else if (strcmp(sym, "EAGAIN") == 0)
163 0 : PG_RETURN_INT32(EAGAIN);
164 2 : else if (strcmp(sym, "EINTR") == 0)
165 0 : PG_RETURN_INT32(EINTR);
166 2 : else if (strcmp(sym, "ENOSPC") == 0)
167 0 : PG_RETURN_INT32(ENOSPC);
168 2 : else if (strcmp(sym, "EROFS") == 0)
169 2 : PG_RETURN_INT32(EROFS);
170 :
171 0 : ereport(ERROR,
172 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
173 : errmsg_internal("%s is not a supported errno value", sym));
174 : PG_RETURN_INT32(0);
175 : }
176 :
177 9 : PG_FUNCTION_INFO_V1(grow_rel);
178 : Datum
179 6 : grow_rel(PG_FUNCTION_ARGS)
180 : {
181 6 : Oid relid = PG_GETARG_OID(0);
182 6 : uint32 nblocks = PG_GETARG_UINT32(1);
183 : Relation rel;
184 : #define MAX_BUFFERS_TO_EXTEND_BY 64
185 : Buffer victim_buffers[MAX_BUFFERS_TO_EXTEND_BY];
186 :
187 6 : rel = relation_open(relid, AccessExclusiveLock);
188 :
189 12 : while (nblocks > 0)
190 : {
191 : uint32 extend_by_pages;
192 :
193 6 : extend_by_pages = Min(nblocks, MAX_BUFFERS_TO_EXTEND_BY);
194 :
195 6 : ExtendBufferedRelBy(BMR_REL(rel),
196 : MAIN_FORKNUM,
197 : NULL,
198 : 0,
199 : extend_by_pages,
200 : victim_buffers,
201 : &extend_by_pages);
202 :
203 6 : nblocks -= extend_by_pages;
204 :
205 78 : for (uint32 i = 0; i < extend_by_pages; i++)
206 : {
207 72 : ReleaseBuffer(victim_buffers[i]);
208 : }
209 : }
210 :
211 6 : relation_close(rel, NoLock);
212 :
213 6 : PG_RETURN_VOID();
214 : }
215 :
216 19 : PG_FUNCTION_INFO_V1(modify_rel_block);
217 : Datum
218 70 : modify_rel_block(PG_FUNCTION_ARGS)
219 : {
220 70 : Oid relid = PG_GETARG_OID(0);
221 70 : BlockNumber blkno = PG_GETARG_UINT32(1);
222 70 : bool zero = PG_GETARG_BOOL(2);
223 70 : bool corrupt_header = PG_GETARG_BOOL(3);
224 70 : bool corrupt_checksum = PG_GETARG_BOOL(4);
225 70 : Page page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
226 : bool flushed;
227 : Relation rel;
228 : Buffer buf;
229 : PageHeader ph;
230 :
231 70 : rel = relation_open(relid, AccessExclusiveLock);
232 :
233 70 : buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
234 : RBM_ZERO_ON_ERROR, NULL);
235 :
236 70 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
237 :
238 : /*
239 : * copy the page to local memory, seems nicer than to directly modify in
240 : * the buffer pool.
241 : */
242 70 : memcpy(page, BufferGetPage(buf), BLCKSZ);
243 :
244 70 : UnlockReleaseBuffer(buf);
245 :
246 : /*
247 : * Don't want to have a buffer in-memory that's marked valid where the
248 : * on-disk contents are invalid. Particularly not if the in-memory buffer
249 : * could be dirty...
250 : *
251 : * While we hold an AEL on the relation nobody else should be able to read
252 : * the buffer in.
253 : *
254 : * NB: This is probably racy, better don't copy this to non-test code.
255 : */
256 70 : if (BufferIsLocal(buf))
257 18 : InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
258 : else
259 52 : EvictUnpinnedBuffer(buf, &flushed);
260 :
261 : /*
262 : * Now modify the page as asked for by the caller.
263 : */
264 70 : if (zero)
265 16 : memset(page, 0, BufferGetPageSize(buf));
266 :
267 70 : if (PageIsEmpty(page) && (corrupt_header || corrupt_checksum))
268 16 : PageInit(page, BufferGetPageSize(buf), 0);
269 :
270 70 : ph = (PageHeader) page;
271 :
272 70 : if (corrupt_header)
273 32 : ph->pd_special = BLCKSZ + 1;
274 :
275 70 : if (corrupt_checksum)
276 : {
277 26 : bool successfully_corrupted = 0;
278 :
279 : /*
280 : * Any single modification of the checksum could just end up being
281 : * valid again, due to e.g. corrupt_header changing the data in a way
282 : * that'd result in the "corrupted" checksum, or the checksum already
283 : * being invalid. Retry in that, unlikely, case.
284 : */
285 26 : for (int i = 0; i < 100; i++)
286 : {
287 : uint16 verify_checksum;
288 : uint16 old_checksum;
289 :
290 26 : old_checksum = ph->pd_checksum;
291 26 : ph->pd_checksum = old_checksum + 1;
292 :
293 26 : elog(LOG, "corrupting checksum of blk %u from %u to %u",
294 : blkno, old_checksum, ph->pd_checksum);
295 :
296 26 : verify_checksum = pg_checksum_page(page, blkno);
297 26 : if (verify_checksum != ph->pd_checksum)
298 : {
299 26 : successfully_corrupted = true;
300 26 : break;
301 : }
302 : }
303 :
304 26 : if (!successfully_corrupted)
305 0 : elog(ERROR, "could not corrupt checksum, what's going on?");
306 : }
307 : else
308 : {
309 44 : PageSetChecksum(page, blkno);
310 : }
311 :
312 70 : smgrwrite(RelationGetSmgr(rel),
313 : MAIN_FORKNUM, blkno, page, true);
314 :
315 70 : relation_close(rel, NoLock);
316 :
317 70 : PG_RETURN_VOID();
318 : }
319 :
320 : /*
321 : * Ensures a buffer for rel & blkno is in shared buffers, without actually
322 : * caring about the buffer contents. Used to set up test scenarios.
323 : */
324 : static Buffer
325 160 : create_toy_buffer(Relation rel, BlockNumber blkno)
326 : {
327 : Buffer buf;
328 : BufferDesc *buf_hdr;
329 : uint64 buf_state;
330 160 : bool was_pinned = false;
331 160 : uint64 unset_bits = 0;
332 :
333 : /* place buffer in shared buffers without erroring out */
334 160 : buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
335 160 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
336 :
337 160 : if (RelationUsesLocalBuffers(rel))
338 : {
339 50 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
340 50 : buf_state = pg_atomic_read_u64(&buf_hdr->state);
341 : }
342 : else
343 : {
344 110 : buf_hdr = GetBufferDescriptor(buf - 1);
345 110 : buf_state = LockBufHdr(buf_hdr);
346 : }
347 :
348 : /*
349 : * We should be the only backend accessing this buffer. This is just a
350 : * small bit of belt-and-suspenders defense, none of this code should ever
351 : * run in a cluster with real data.
352 : */
353 160 : if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
354 0 : was_pinned = true;
355 : else
356 160 : unset_bits |= BM_VALID | BM_DIRTY;
357 :
358 160 : if (RelationUsesLocalBuffers(rel))
359 : {
360 50 : buf_state &= ~unset_bits;
361 50 : pg_atomic_unlocked_write_u64(&buf_hdr->state, buf_state);
362 : }
363 : else
364 : {
365 110 : UnlockBufHdrExt(buf_hdr, buf_state, 0, unset_bits, 0);
366 : }
367 :
368 160 : if (was_pinned)
369 0 : elog(ERROR, "toy buffer %d was already pinned",
370 : buf);
371 :
372 160 : return buf;
373 : }
374 :
375 : /*
376 : * A "low level" read. This does similar things to what
377 : * StartReadBuffers()/WaitReadBuffers() do, but provides more control (and
378 : * less sanity).
379 : */
380 27 : PG_FUNCTION_INFO_V1(read_rel_block_ll);
381 : Datum
382 94 : read_rel_block_ll(PG_FUNCTION_ARGS)
383 : {
384 94 : Oid relid = PG_GETARG_OID(0);
385 94 : BlockNumber blkno = PG_GETARG_UINT32(1);
386 94 : int nblocks = PG_GETARG_INT32(2);
387 94 : bool wait_complete = PG_GETARG_BOOL(3);
388 94 : bool batchmode_enter = PG_GETARG_BOOL(4);
389 94 : bool call_smgrreleaseall = PG_GETARG_BOOL(5);
390 94 : bool batchmode_exit = PG_GETARG_BOOL(6);
391 94 : bool zero_on_error = PG_GETARG_BOOL(7);
392 : Relation rel;
393 : Buffer bufs[PG_IOV_MAX];
394 : BufferDesc *buf_hdrs[PG_IOV_MAX];
395 : Page pages[PG_IOV_MAX];
396 94 : uint8 srb_flags = 0;
397 : PgAioReturn ior;
398 : PgAioHandle *ioh;
399 : PgAioWaitRef iow;
400 : SMgrRelation smgr;
401 :
402 94 : if (nblocks <= 0 || nblocks > PG_IOV_MAX)
403 0 : elog(ERROR, "nblocks is out of range");
404 :
405 94 : rel = relation_open(relid, AccessShareLock);
406 :
407 246 : for (int i = 0; i < nblocks; i++)
408 : {
409 152 : bufs[i] = create_toy_buffer(rel, blkno + i);
410 152 : pages[i] = BufferGetBlock(bufs[i]);
411 152 : buf_hdrs[i] = BufferIsLocal(bufs[i]) ?
412 152 : GetLocalBufferDescriptor(-bufs[i] - 1) :
413 104 : GetBufferDescriptor(bufs[i] - 1);
414 : }
415 :
416 94 : smgr = RelationGetSmgr(rel);
417 :
418 94 : pgstat_prepare_report_checksum_failure(smgr->smgr_rlocator.locator.dbOid);
419 :
420 94 : ioh = pgaio_io_acquire(CurrentResourceOwner, &ior);
421 94 : pgaio_io_get_wref(ioh, &iow);
422 :
423 94 : if (RelationUsesLocalBuffers(rel))
424 : {
425 76 : for (int i = 0; i < nblocks; i++)
426 48 : StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
427 28 : pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
428 : }
429 : else
430 : {
431 170 : for (int i = 0; i < nblocks; i++)
432 104 : StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
433 : }
434 :
435 94 : pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
436 :
437 94 : if (zero_on_error | zero_damaged_pages)
438 22 : srb_flags |= READ_BUFFERS_ZERO_ON_ERROR;
439 94 : if (ignore_checksum_failure)
440 10 : srb_flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
441 :
442 94 : pgaio_io_register_callbacks(ioh,
443 94 : RelationUsesLocalBuffers(rel) ?
444 : PGAIO_HCB_LOCAL_BUFFER_READV :
445 : PGAIO_HCB_SHARED_BUFFER_READV,
446 : srb_flags);
447 :
448 94 : if (batchmode_enter)
449 4 : pgaio_enter_batchmode();
450 :
451 94 : smgrstartreadv(ioh, smgr, MAIN_FORKNUM, blkno,
452 : (void *) pages, nblocks);
453 :
454 94 : if (call_smgrreleaseall)
455 4 : smgrreleaseall();
456 :
457 94 : if (batchmode_exit)
458 4 : pgaio_exit_batchmode();
459 :
460 246 : for (int i = 0; i < nblocks; i++)
461 152 : ReleaseBuffer(bufs[i]);
462 :
463 94 : if (wait_complete)
464 : {
465 60 : pgaio_wref_wait(&iow);
466 :
467 60 : if (ior.result.status != PGAIO_RS_OK)
468 46 : pgaio_result_report(ior.result,
469 : &ior.target_data,
470 46 : ior.result.status == PGAIO_RS_ERROR ?
471 : ERROR : WARNING);
472 : }
473 :
474 70 : relation_close(rel, NoLock);
475 :
476 70 : PG_RETURN_VOID();
477 : }
478 :
479 : /* helper for invalidate_rel_block() and evict_rel() */
480 : static void
481 469 : invalidate_one_block(Relation rel, ForkNumber forknum, BlockNumber blkno)
482 : {
483 : PrefetchBufferResult pr;
484 : Buffer buf;
485 :
486 : /*
487 : * This is a gross hack, but there's no other API exposed that allows to
488 : * get a buffer ID without actually reading the block in.
489 : */
490 469 : pr = PrefetchBuffer(rel, forknum, blkno);
491 469 : buf = pr.recent_buffer;
492 :
493 469 : if (BufferIsValid(buf))
494 : {
495 : /* if the buffer contents aren't valid, this'll return false */
496 230 : if (ReadRecentBuffer(rel->rd_locator, forknum, blkno, buf))
497 : {
498 224 : BufferDesc *buf_hdr = BufferIsLocal(buf) ?
499 140 : GetLocalBufferDescriptor(-buf - 1)
500 224 : : GetBufferDescriptor(buf - 1);
501 : bool flushed;
502 :
503 224 : LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
504 :
505 224 : if (pg_atomic_read_u64(&buf_hdr->state) & BM_DIRTY)
506 : {
507 124 : if (BufferIsLocal(buf))
508 74 : FlushLocalBuffer(buf_hdr, NULL);
509 : else
510 50 : FlushOneBuffer(buf);
511 : }
512 224 : UnlockReleaseBuffer(buf);
513 :
514 224 : if (BufferIsLocal(buf))
515 140 : InvalidateLocalBuffer(GetLocalBufferDescriptor(-buf - 1), true);
516 84 : else if (!EvictUnpinnedBuffer(buf, &flushed))
517 0 : elog(ERROR, "couldn't evict");
518 : }
519 : }
520 :
521 469 : }
522 :
523 16 : PG_FUNCTION_INFO_V1(invalidate_rel_block);
524 : Datum
525 157 : invalidate_rel_block(PG_FUNCTION_ARGS)
526 : {
527 157 : Oid relid = PG_GETARG_OID(0);
528 157 : BlockNumber blkno = PG_GETARG_UINT32(1);
529 : Relation rel;
530 :
531 157 : rel = relation_open(relid, AccessExclusiveLock);
532 :
533 157 : invalidate_one_block(rel, MAIN_FORKNUM, blkno);
534 :
535 157 : relation_close(rel, AccessExclusiveLock);
536 :
537 157 : PG_RETURN_VOID();
538 : }
539 :
540 13 : PG_FUNCTION_INFO_V1(evict_rel);
541 : Datum
542 40 : evict_rel(PG_FUNCTION_ARGS)
543 : {
544 40 : Oid relid = PG_GETARG_OID(0);
545 : Relation rel;
546 :
547 40 : rel = relation_open(relid, AccessExclusiveLock);
548 :
549 : /*
550 : * EvictRelUnpinnedBuffers() doesn't support temp tables, so for temp
551 : * tables we have to do it the expensive way and evict every possible
552 : * buffer.
553 : */
554 40 : if (RelationUsesLocalBuffers(rel))
555 : {
556 12 : SMgrRelation smgr = RelationGetSmgr(rel);
557 :
558 60 : for (int forknum = MAIN_FORKNUM; forknum <= MAX_FORKNUM; forknum++)
559 : {
560 : BlockNumber nblocks;
561 :
562 48 : if (!smgrexists(smgr, forknum))
563 24 : continue;
564 :
565 24 : nblocks = smgrnblocks(smgr, forknum);
566 :
567 336 : for (int blkno = 0; blkno < nblocks; blkno++)
568 : {
569 312 : invalidate_one_block(rel, forknum, blkno);
570 : }
571 : }
572 : }
573 : else
574 : {
575 : int32 buffers_evicted,
576 : buffers_flushed,
577 : buffers_skipped;
578 :
579 28 : EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
580 : &buffers_skipped);
581 : }
582 :
583 40 : relation_close(rel, AccessExclusiveLock);
584 :
585 :
586 40 : PG_RETURN_VOID();
587 : }
588 :
589 8 : PG_FUNCTION_INFO_V1(buffer_create_toy);
590 : Datum
591 8 : buffer_create_toy(PG_FUNCTION_ARGS)
592 : {
593 8 : Oid relid = PG_GETARG_OID(0);
594 8 : BlockNumber blkno = PG_GETARG_UINT32(1);
595 : Relation rel;
596 : Buffer buf;
597 :
598 8 : rel = relation_open(relid, AccessExclusiveLock);
599 :
600 8 : buf = create_toy_buffer(rel, blkno);
601 8 : ReleaseBuffer(buf);
602 :
603 8 : relation_close(rel, NoLock);
604 :
605 8 : PG_RETURN_INT32(buf);
606 : }
607 :
608 10 : PG_FUNCTION_INFO_V1(buffer_call_start_io);
609 : Datum
610 22 : buffer_call_start_io(PG_FUNCTION_ARGS)
611 : {
612 22 : Buffer buf = PG_GETARG_INT32(0);
613 22 : bool for_input = PG_GETARG_BOOL(1);
614 22 : bool wait = PG_GETARG_BOOL(2);
615 : StartBufferIOResult result;
616 : bool can_start;
617 :
618 22 : if (BufferIsLocal(buf))
619 8 : result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
620 : for_input, wait, NULL);
621 : else
622 14 : result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
623 : for_input, wait, NULL);
624 :
625 22 : can_start = result == BUFFER_IO_READY_FOR_IO;
626 :
627 : /*
628 : * For tests we don't want the resowner release preventing us from
629 : * orchestrating odd scenarios.
630 : */
631 22 : if (can_start && !BufferIsLocal(buf))
632 8 : ResourceOwnerForgetBufferIO(CurrentResourceOwner,
633 : buf);
634 :
635 22 : ereport(LOG,
636 : errmsg("buffer %d after StartBufferIO: %s",
637 : buf, DebugPrintBufferRefcount(buf)),
638 : errhidestmt(true), errhidecontext(true));
639 :
640 22 : PG_RETURN_BOOL(can_start);
641 : }
642 :
643 10 : PG_FUNCTION_INFO_V1(buffer_call_terminate_io);
644 : Datum
645 12 : buffer_call_terminate_io(PG_FUNCTION_ARGS)
646 : {
647 12 : Buffer buf = PG_GETARG_INT32(0);
648 12 : bool for_input = PG_GETARG_BOOL(1);
649 12 : bool succeed = PG_GETARG_BOOL(2);
650 12 : bool io_error = PG_GETARG_BOOL(3);
651 12 : bool release_aio = PG_GETARG_BOOL(4);
652 12 : bool clear_dirty = false;
653 12 : uint64 set_flag_bits = 0;
654 :
655 12 : if (io_error)
656 0 : set_flag_bits |= BM_IO_ERROR;
657 :
658 12 : if (for_input)
659 : {
660 12 : clear_dirty = false;
661 :
662 12 : if (succeed)
663 5 : set_flag_bits |= BM_VALID;
664 : }
665 : else
666 : {
667 0 : if (succeed)
668 0 : clear_dirty = true;
669 : }
670 :
671 12 : ereport(LOG,
672 : errmsg("buffer %d before Terminate[Local]BufferIO: %s",
673 : buf, DebugPrintBufferRefcount(buf)),
674 : errhidestmt(true), errhidecontext(true));
675 :
676 12 : if (BufferIsLocal(buf))
677 4 : TerminateLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
678 : clear_dirty, set_flag_bits, release_aio);
679 : else
680 8 : TerminateBufferIO(GetBufferDescriptor(buf - 1),
681 : clear_dirty, set_flag_bits, false, release_aio);
682 :
683 12 : ereport(LOG,
684 : errmsg("buffer %d after Terminate[Local]BufferIO: %s",
685 : buf, DebugPrintBufferRefcount(buf)),
686 : errhidestmt(true), errhidecontext(true));
687 :
688 12 : PG_RETURN_VOID();
689 : }
690 :
691 9 : PG_FUNCTION_INFO_V1(read_buffers);
692 : /*
693 : * Infrastructure to test StartReadBuffers()
694 : */
695 : Datum
696 74 : read_buffers(PG_FUNCTION_ARGS)
697 : {
698 74 : Oid relid = PG_GETARG_OID(0);
699 74 : BlockNumber startblock = PG_GETARG_UINT32(1);
700 74 : int32 nblocks = PG_GETARG_INT32(2);
701 74 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
702 : Relation rel;
703 : SMgrRelation smgr;
704 74 : int nblocks_done = 0;
705 74 : int nblocks_disp = 0;
706 74 : int nios = 0;
707 : ReadBuffersOperation *operations;
708 : Buffer *buffers;
709 : Datum *buffers_datum;
710 : bool *io_reqds;
711 : int *nblocks_per_io;
712 :
713 : Assert(nblocks > 0);
714 :
715 74 : InitMaterializedSRF(fcinfo, 0);
716 :
717 : /* at worst each block gets its own IO */
718 74 : operations = palloc0(sizeof(ReadBuffersOperation) * nblocks);
719 74 : buffers = palloc0(sizeof(Buffer) * nblocks);
720 74 : buffers_datum = palloc0(sizeof(Datum) * nblocks);
721 74 : io_reqds = palloc0(sizeof(bool) * nblocks);
722 74 : nblocks_per_io = palloc0(sizeof(int) * nblocks);
723 :
724 74 : rel = relation_open(relid, AccessShareLock);
725 74 : smgr = RelationGetSmgr(rel);
726 :
727 : /*
728 : * Do StartReadBuffers() until IO for all the required blocks has been
729 : * started (if required).
730 : */
731 214 : while (nblocks_done < nblocks)
732 : {
733 140 : ReadBuffersOperation *operation = &operations[nios];
734 140 : int nblocks_this_io =
735 140 : Min(nblocks - nblocks_done, io_combine_limit);
736 :
737 140 : operation->rel = rel;
738 140 : operation->smgr = smgr;
739 140 : operation->persistence = rel->rd_rel->relpersistence;
740 140 : operation->strategy = NULL;
741 140 : operation->forknum = MAIN_FORKNUM;
742 :
743 280 : io_reqds[nios] = StartReadBuffers(operation,
744 140 : &buffers[nblocks_done],
745 : startblock + nblocks_done,
746 : &nblocks_this_io,
747 : 0);
748 140 : nblocks_per_io[nios] = nblocks_this_io;
749 140 : nios++;
750 140 : nblocks_done += nblocks_this_io;
751 : }
752 :
753 : /*
754 : * Now wait for all operations that required IO. This is done at the end,
755 : * as otherwise waiting for IO in progress in other backends could
756 : * influence the result for subsequent buffers / blocks.
757 : */
758 214 : for (int nio = 0; nio < nios; nio++)
759 : {
760 140 : ReadBuffersOperation *operation = &operations[nio];
761 :
762 140 : if (io_reqds[nio])
763 75 : WaitReadBuffers(operation);
764 : }
765 :
766 : /*
767 : * Convert what has been done into SQL SRF return value.
768 : */
769 214 : for (int nio = 0; nio < nios; nio++)
770 : {
771 140 : ReadBuffersOperation *operation = &operations[nio];
772 140 : int nblocks_this_io = nblocks_per_io[nio];
773 140 : Datum values[6] = {0};
774 140 : bool nulls[6] = {0};
775 : ArrayType *buffers_arr;
776 :
777 : /* convert buffer array to datum array */
778 330 : for (int i = 0; i < nblocks_this_io; i++)
779 : {
780 190 : Buffer buf = buffers[nblocks_disp + i];
781 :
782 : Assert(BufferGetBlockNumber(buf) == startblock + nblocks_disp + i);
783 :
784 190 : buffers_datum[nblocks_disp + i] = Int32GetDatum(buf);
785 : }
786 :
787 140 : buffers_arr = construct_array_builtin(&buffers_datum[nblocks_disp],
788 : nblocks_this_io,
789 : INT4OID);
790 :
791 : /* blockoff */
792 140 : values[0] = Int32GetDatum(nblocks_disp);
793 140 : nulls[0] = false;
794 :
795 : /* blocknum */
796 140 : values[1] = UInt32GetDatum(startblock + nblocks_disp);
797 140 : nulls[1] = false;
798 :
799 : /* io_reqd */
800 140 : values[2] = BoolGetDatum(io_reqds[nio]);
801 140 : nulls[2] = false;
802 :
803 : /* foreign IO - only valid when IO was required */
804 140 : values[3] = BoolGetDatum(io_reqds[nio] ? operation->foreign_io : false);
805 140 : nulls[3] = false;
806 :
807 : /* nblocks */
808 140 : values[4] = Int32GetDatum(nblocks_this_io);
809 140 : nulls[4] = false;
810 :
811 : /* array of buffers */
812 140 : values[5] = PointerGetDatum(buffers_arr);
813 140 : nulls[5] = false;
814 :
815 140 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
816 :
817 140 : nblocks_disp += nblocks_this_io;
818 : }
819 :
820 : /* release pins on all the buffers */
821 264 : for (int i = 0; i < nblocks_done; i++)
822 190 : ReleaseBuffer(buffers[i]);
823 :
824 : /*
825 : * Free explicitly, to have a chance to detect potential issues with too
826 : * long lived references to the operation.
827 : */
828 74 : pfree(operations);
829 74 : pfree(buffers);
830 74 : pfree(buffers_datum);
831 74 : pfree(io_reqds);
832 74 : pfree(nblocks_per_io);
833 :
834 74 : relation_close(rel, NoLock);
835 :
836 74 : return (Datum) 0;
837 : }
838 :
839 :
840 : static BlockNumber
841 104 : read_stream_for_blocks_cb(ReadStream *stream,
842 : void *callback_private_data,
843 : void *per_buffer_data)
844 : {
845 104 : BlocksReadStreamData *stream_data = callback_private_data;
846 :
847 104 : if (stream_data->curblock >= stream_data->nblocks)
848 16 : return InvalidBlockNumber;
849 88 : return stream_data->blocks[stream_data->curblock++];
850 : }
851 :
852 9 : PG_FUNCTION_INFO_V1(read_stream_for_blocks);
853 : Datum
854 16 : read_stream_for_blocks(PG_FUNCTION_ARGS)
855 : {
856 16 : Oid relid = PG_GETARG_OID(0);
857 16 : ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
858 16 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
859 : Relation rel;
860 : BlocksReadStreamData stream_data;
861 : ReadStream *stream;
862 :
863 16 : InitMaterializedSRF(fcinfo, 0);
864 :
865 : /*
866 : * We expect the input to be an N-element int4 array; verify that. We
867 : * don't need to use deconstruct_array() since the array data is just
868 : * going to look like a C array of N int4 values.
869 : */
870 16 : if (ARR_NDIM(blocksarray) != 1 ||
871 16 : ARR_HASNULL(blocksarray) ||
872 16 : ARR_ELEMTYPE(blocksarray) != INT4OID)
873 0 : elog(ERROR, "expected 1 dimensional int4 array");
874 :
875 16 : stream_data.curblock = 0;
876 16 : stream_data.nblocks = ARR_DIMS(blocksarray)[0];
877 16 : stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
878 :
879 16 : rel = relation_open(relid, AccessShareLock);
880 :
881 16 : stream = read_stream_begin_relation(READ_STREAM_FULL,
882 : NULL,
883 : rel,
884 : MAIN_FORKNUM,
885 : read_stream_for_blocks_cb,
886 : &stream_data,
887 : 0);
888 :
889 104 : for (int i = 0; i < stream_data.nblocks; i++)
890 : {
891 88 : Buffer buf = read_stream_next_buffer(stream, NULL);
892 88 : Datum values[3] = {0};
893 88 : bool nulls[3] = {0};
894 :
895 88 : if (!BufferIsValid(buf))
896 0 : elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
897 :
898 88 : values[0] = Int32GetDatum(i);
899 88 : values[1] = UInt32GetDatum(stream_data.blocks[i]);
900 88 : values[2] = UInt32GetDatum(buf);
901 :
902 88 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
903 :
904 88 : ReleaseBuffer(buf);
905 : }
906 :
907 16 : if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
908 0 : elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
909 : stream_data.nblocks);
910 :
911 16 : read_stream_end(stream);
912 :
913 16 : relation_close(rel, NoLock);
914 :
915 16 : return (Datum) 0;
916 : }
917 :
918 :
919 7 : PG_FUNCTION_INFO_V1(handle_get);
920 : Datum
921 12 : handle_get(PG_FUNCTION_ARGS)
922 : {
923 12 : last_handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
924 :
925 12 : PG_RETURN_VOID();
926 : }
927 :
928 7 : PG_FUNCTION_INFO_V1(handle_release_last);
929 : Datum
930 4 : handle_release_last(PG_FUNCTION_ARGS)
931 : {
932 4 : if (!last_handle)
933 0 : elog(ERROR, "no handle");
934 :
935 4 : pgaio_io_release(last_handle);
936 :
937 2 : PG_RETURN_VOID();
938 : }
939 :
940 7 : PG_FUNCTION_INFO_V1(handle_get_and_error);
941 : Datum
942 6 : handle_get_and_error(PG_FUNCTION_ARGS)
943 : {
944 6 : pgaio_io_acquire(CurrentResourceOwner, NULL);
945 :
946 6 : elog(ERROR, "as you command");
947 : PG_RETURN_VOID();
948 : }
949 :
950 7 : PG_FUNCTION_INFO_V1(handle_get_twice);
951 : Datum
952 2 : handle_get_twice(PG_FUNCTION_ARGS)
953 : {
954 2 : pgaio_io_acquire(CurrentResourceOwner, NULL);
955 2 : pgaio_io_acquire(CurrentResourceOwner, NULL);
956 :
957 0 : PG_RETURN_VOID();
958 : }
959 :
960 7 : PG_FUNCTION_INFO_V1(handle_get_release);
961 : Datum
962 6 : handle_get_release(PG_FUNCTION_ARGS)
963 : {
964 : PgAioHandle *handle;
965 :
966 6 : handle = pgaio_io_acquire(CurrentResourceOwner, NULL);
967 6 : pgaio_io_release(handle);
968 :
969 6 : PG_RETURN_VOID();
970 : }
971 :
972 7 : PG_FUNCTION_INFO_V1(batch_start);
973 : Datum
974 6 : batch_start(PG_FUNCTION_ARGS)
975 : {
976 6 : pgaio_enter_batchmode();
977 6 : PG_RETURN_VOID();
978 : }
979 :
980 7 : PG_FUNCTION_INFO_V1(batch_end);
981 : Datum
982 2 : batch_end(PG_FUNCTION_ARGS)
983 : {
984 2 : pgaio_exit_batchmode();
985 2 : PG_RETURN_VOID();
986 : }
987 :
988 : #ifdef USE_INJECTION_POINTS
989 : extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
990 : const void *private_data,
991 : void *arg);
992 : extern PGDLLEXPORT void inj_io_reopen(const char *name,
993 : const void *private_data,
994 : void *arg);
995 :
996 : static bool
997 2135 : inj_io_short_read_matches(PgAioHandle *ioh)
998 : {
999 : PGPROC *io_proc;
1000 : int32 io_pid;
1001 : int32 inj_pid;
1002 : PgAioTargetData *td;
1003 :
1004 2135 : if (!inj_io_error_state->enabled_short_read)
1005 1970 : return false;
1006 :
1007 165 : if (!inj_io_error_state->short_read_result_set)
1008 0 : return false;
1009 :
1010 165 : io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
1011 165 : io_pid = io_proc->pid;
1012 165 : inj_pid = inj_io_error_state->short_read_pid;
1013 :
1014 165 : if (inj_pid != InvalidPid && inj_pid != io_pid)
1015 8 : return false;
1016 :
1017 157 : td = pgaio_io_get_target_data(ioh);
1018 :
1019 157 : if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
1020 4 : td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
1021 2 : return false;
1022 :
1023 : /*
1024 : * Only shorten reads that are actually longer than the target size,
1025 : * otherwise we can trigger over-reads.
1026 : */
1027 155 : if (inj_io_error_state->short_read_result >= ioh->result)
1028 34 : return false;
1029 :
1030 121 : return true;
1031 : }
1032 :
1033 : static bool
1034 2155 : inj_io_completion_wait_matches(PgAioHandle *ioh)
1035 : {
1036 : PGPROC *io_proc;
1037 : int32 io_pid;
1038 : PgAioTargetData *td;
1039 : int32 inj_pid;
1040 : BlockNumber io_blockno;
1041 : BlockNumber inj_blockno;
1042 : Oid inj_relfilenode;
1043 :
1044 2155 : if (!inj_io_error_state->enabled_completion_wait)
1045 2018 : return false;
1046 :
1047 137 : io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
1048 137 : io_pid = io_proc->pid;
1049 137 : inj_pid = inj_io_error_state->completion_wait_pid;
1050 :
1051 137 : if (inj_pid != InvalidPid && inj_pid != io_pid)
1052 111 : return false;
1053 :
1054 26 : td = pgaio_io_get_target_data(ioh);
1055 :
1056 26 : inj_relfilenode = inj_io_error_state->completion_wait_relfilenode;
1057 26 : if (inj_relfilenode != InvalidOid &&
1058 26 : td->smgr.rlocator.relNumber != inj_relfilenode)
1059 6 : return false;
1060 :
1061 20 : inj_blockno = inj_io_error_state->completion_wait_blockno;
1062 20 : io_blockno = td->smgr.blockNum;
1063 20 : if (inj_blockno != InvalidBlockNumber &&
1064 8 : !(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
1065 0 : return false;
1066 :
1067 20 : return true;
1068 : }
1069 :
1070 : static void
1071 2135 : inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
1072 : {
1073 2135 : PgAioHandle *ioh = (PgAioHandle *) arg;
1074 :
1075 2135 : if (!inj_io_completion_wait_matches(ioh))
1076 2125 : return;
1077 :
1078 10 : ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
1079 :
1080 : while (true)
1081 : {
1082 20 : if (!inj_io_completion_wait_matches(ioh))
1083 10 : break;
1084 :
1085 10 : ConditionVariableSleep(&inj_io_error_state->cv,
1086 10 : inj_io_error_state->completion_wait_event);
1087 : }
1088 :
1089 10 : ConditionVariableCancelSleep();
1090 : }
1091 :
1092 : static void
1093 2135 : inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
1094 : {
1095 2135 : PgAioHandle *ioh = (PgAioHandle *) arg;
1096 :
1097 2135 : ereport(LOG,
1098 : errmsg("short read injection point called, is enabled: %d",
1099 : inj_io_error_state->enabled_short_read),
1100 : errhidestmt(true), errhidecontext(true));
1101 :
1102 2135 : if (inj_io_short_read_matches(ioh))
1103 : {
1104 121 : struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
1105 121 : int32 old_result = ioh->result;
1106 121 : int32 new_result = inj_io_error_state->short_read_result;
1107 121 : int32 processed = 0;
1108 :
1109 121 : ereport(LOG,
1110 : errmsg("short read inject point, changing result from %d to %d",
1111 : old_result, new_result),
1112 : errhidestmt(true), errhidecontext(true));
1113 :
1114 : /*
1115 : * The underlying IO actually completed OK, and thus the "invalid"
1116 : * portion of the IOV actually contains valid data. That can hide a
1117 : * lot of problems, e.g. if we were to wrongly mark a buffer, that
1118 : * wasn't read according to the shortened-read, IO as valid, the
1119 : * contents would look valid and we might miss a bug.
1120 : *
1121 : * To avoid that, iterate through the IOV and zero out the "failed"
1122 : * portion of the IO.
1123 : */
1124 246 : for (int i = 0; i < ioh->op_data.read.iov_length; i++)
1125 : {
1126 125 : if (processed + iov[i].iov_len <= new_result)
1127 12 : processed += iov[i].iov_len;
1128 113 : else if (processed <= new_result)
1129 : {
1130 113 : uint32 ok_part = new_result - processed;
1131 :
1132 113 : memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
1133 113 : processed += iov[i].iov_len;
1134 : }
1135 : else
1136 : {
1137 0 : memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
1138 : }
1139 : }
1140 :
1141 121 : ioh->result = new_result;
1142 : }
1143 2135 : }
1144 :
1145 : void
1146 2135 : inj_io_completion_hook(const char *name, const void *private_data, void *arg)
1147 : {
1148 2135 : inj_io_completion_wait_hook(name, private_data, arg);
1149 2135 : inj_io_short_read_hook(name, private_data, arg);
1150 2135 : }
1151 :
1152 : void
1153 503 : inj_io_reopen(const char *name, const void *private_data, void *arg)
1154 : {
1155 503 : ereport(LOG,
1156 : errmsg("reopen injection point called, is enabled: %d",
1157 : inj_io_error_state->enabled_reopen),
1158 : errhidestmt(true), errhidecontext(true));
1159 :
1160 503 : if (inj_io_error_state->enabled_reopen)
1161 1 : elog(ERROR, "injection point triggering failure to reopen ");
1162 502 : }
1163 : #endif
1164 :
1165 9 : PG_FUNCTION_INFO_V1(inj_io_completion_wait);
1166 : Datum
1167 10 : inj_io_completion_wait(PG_FUNCTION_ARGS)
1168 : {
1169 : #ifdef USE_INJECTION_POINTS
1170 10 : inj_io_error_state->enabled_completion_wait = true;
1171 10 : inj_io_error_state->completion_wait_pid =
1172 10 : PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0);
1173 10 : inj_io_error_state->completion_wait_relfilenode =
1174 10 : PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1175 10 : inj_io_error_state->completion_wait_blockno =
1176 10 : PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2);
1177 : #else
1178 : elog(ERROR, "injection points not supported");
1179 : #endif
1180 :
1181 10 : PG_RETURN_VOID();
1182 : }
1183 :
1184 13 : PG_FUNCTION_INFO_V1(inj_io_completion_continue);
1185 : Datum
1186 10 : inj_io_completion_continue(PG_FUNCTION_ARGS)
1187 : {
1188 : #ifdef USE_INJECTION_POINTS
1189 10 : inj_io_error_state->enabled_completion_wait = false;
1190 10 : inj_io_error_state->completion_wait_pid = InvalidPid;
1191 10 : inj_io_error_state->completion_wait_relfilenode = InvalidOid;
1192 10 : inj_io_error_state->completion_wait_blockno = InvalidBlockNumber;
1193 10 : ConditionVariableBroadcast(&inj_io_error_state->cv);
1194 : #else
1195 : elog(ERROR, "injection points not supported");
1196 : #endif
1197 :
1198 10 : PG_RETURN_VOID();
1199 : }
1200 :
1201 9 : PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
1202 : Datum
1203 16 : inj_io_short_read_attach(PG_FUNCTION_ARGS)
1204 : {
1205 : #ifdef USE_INJECTION_POINTS
1206 16 : inj_io_error_state->enabled_short_read = true;
1207 16 : inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
1208 16 : if (inj_io_error_state->short_read_result_set)
1209 16 : inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
1210 16 : inj_io_error_state->short_read_pid =
1211 16 : PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1);
1212 16 : inj_io_error_state->short_read_relfilenode =
1213 16 : PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2);
1214 : #else
1215 : elog(ERROR, "injection points not supported");
1216 : #endif
1217 :
1218 16 : PG_RETURN_VOID();
1219 : }
1220 :
1221 9 : PG_FUNCTION_INFO_V1(inj_io_short_read_detach);
1222 : Datum
1223 6 : inj_io_short_read_detach(PG_FUNCTION_ARGS)
1224 : {
1225 : #ifdef USE_INJECTION_POINTS
1226 6 : inj_io_error_state->enabled_short_read = false;
1227 : #else
1228 : elog(ERROR, "injection points not supported");
1229 : #endif
1230 6 : PG_RETURN_VOID();
1231 : }
1232 :
1233 6 : PG_FUNCTION_INFO_V1(inj_io_reopen_attach);
1234 : Datum
1235 1 : inj_io_reopen_attach(PG_FUNCTION_ARGS)
1236 : {
1237 : #ifdef USE_INJECTION_POINTS
1238 1 : inj_io_error_state->enabled_reopen = true;
1239 : #else
1240 : elog(ERROR, "injection points not supported");
1241 : #endif
1242 :
1243 1 : PG_RETURN_VOID();
1244 : }
1245 :
1246 6 : PG_FUNCTION_INFO_V1(inj_io_reopen_detach);
1247 : Datum
1248 1 : inj_io_reopen_detach(PG_FUNCTION_ARGS)
1249 : {
1250 : #ifdef USE_INJECTION_POINTS
1251 1 : inj_io_error_state->enabled_reopen = false;
1252 : #else
1253 : elog(ERROR, "injection points not supported");
1254 : #endif
1255 1 : PG_RETURN_VOID();
1256 : }
|