LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 100 114 87.7 %
Date: 2025-01-18 04:15:08 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logical/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xlogrecovery.h"
      21             : #include "access/xlogutils.h"
      22             : #include "catalog/pg_type.h"
      23             : #include "fmgr.h"
      24             : #include "funcapi.h"
      25             : #include "mb/pg_wchar.h"
      26             : #include "miscadmin.h"
      27             : #include "nodes/makefuncs.h"
      28             : #include "replication/decode.h"
      29             : #include "replication/logical.h"
      30             : #include "replication/message.h"
      31             : #include "utils/array.h"
      32             : #include "utils/builtins.h"
      33             : #include "utils/inval.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/pg_lsn.h"
      36             : #include "utils/regproc.h"
      37             : #include "utils/resowner.h"
      38             : 
      39             : /* Private data for writing out data */
      40             : typedef struct DecodingOutputState
      41             : {
      42             :     Tuplestorestate *tupstore;
      43             :     TupleDesc   tupdesc;
      44             :     bool        binary_output;
      45             :     int64       returned_rows;
      46             : } DecodingOutputState;
      47             : 
      48             : /*
      49             :  * Prepare for an output plugin write.
      50             :  */
      51             : static void
      52      302404 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      53             :                           bool last_write)
      54             : {
      55      302404 :     resetStringInfo(ctx->out);
      56      302404 : }
      57             : 
      58             : /*
      59             :  * Perform output plugin write into tuplestore.
      60             :  */
      61             : static void
      62      302404 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      63             :                    bool last_write)
      64             : {
      65             :     Datum       values[3];
      66             :     bool        nulls[3];
      67             :     DecodingOutputState *p;
      68             : 
      69             :     /* SQL Datums can only be of a limited length... */
      70      302404 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      71           0 :         elog(ERROR, "too much output for sql interface");
      72             : 
      73      302404 :     p = (DecodingOutputState *) ctx->output_writer_private;
      74             : 
      75      302404 :     memset(nulls, 0, sizeof(nulls));
      76      302404 :     values[0] = LSNGetDatum(lsn);
      77      302404 :     values[1] = TransactionIdGetDatum(xid);
      78             : 
      79             :     /*
      80             :      * Assert ctx->out is in database encoding when we're writing textual
      81             :      * output.
      82             :      */
      83      302404 :     if (!p->binary_output)
      84             :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      85             :                                ctx->out->data, ctx->out->len,
      86             :                                false));
      87             : 
      88             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      89      302404 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      90             : 
      91      302404 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      92      302404 :     p->returned_rows++;
      93      302404 : }
      94             : 
      95             : /*
      96             :  * Helper function for the various SQL callable logical decoding functions.
      97             :  */
      98             : static Datum
      99         424 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     100             : {
     101             :     Name        name;
     102             :     XLogRecPtr  upto_lsn;
     103             :     int32       upto_nchanges;
     104         424 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     105             :     MemoryContext per_query_ctx;
     106             :     MemoryContext oldcontext;
     107             :     XLogRecPtr  end_of_wal;
     108             :     XLogRecPtr  wait_for_wal_lsn;
     109             :     LogicalDecodingContext *ctx;
     110         424 :     ResourceOwner old_resowner = CurrentResourceOwner;
     111             :     ArrayType  *arr;
     112             :     Size        ndim;
     113         424 :     List       *options = NIL;
     114             :     DecodingOutputState *p;
     115             : 
     116         424 :     CheckSlotPermissions();
     117             : 
     118         422 :     CheckLogicalDecodingRequirements();
     119             : 
     120         422 :     if (PG_ARGISNULL(0))
     121           0 :         ereport(ERROR,
     122             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     123             :                  errmsg("slot name must not be null")));
     124         422 :     name = PG_GETARG_NAME(0);
     125             : 
     126         422 :     if (PG_ARGISNULL(1))
     127         422 :         upto_lsn = InvalidXLogRecPtr;
     128             :     else
     129           0 :         upto_lsn = PG_GETARG_LSN(1);
     130             : 
     131         422 :     if (PG_ARGISNULL(2))
     132         422 :         upto_nchanges = InvalidXLogRecPtr;
     133             :     else
     134           0 :         upto_nchanges = PG_GETARG_INT32(2);
     135             : 
     136         422 :     if (PG_ARGISNULL(3))
     137           0 :         ereport(ERROR,
     138             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     139             :                  errmsg("options array must not be null")));
     140         422 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     141             : 
     142             :     /* state to write output to */
     143         422 :     p = palloc0(sizeof(DecodingOutputState));
     144             : 
     145         422 :     p->binary_output = binary;
     146             : 
     147         422 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     148         422 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     149             : 
     150             :     /* Deconstruct options array */
     151         422 :     ndim = ARR_NDIM(arr);
     152         422 :     if (ndim > 1)
     153             :     {
     154           0 :         ereport(ERROR,
     155             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     156             :                  errmsg("array must be one-dimensional")));
     157             :     }
     158         422 :     else if (array_contains_nulls(arr))
     159             :     {
     160           0 :         ereport(ERROR,
     161             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     162             :                  errmsg("array must not contain nulls")));
     163             :     }
     164         422 :     else if (ndim == 1)
     165             :     {
     166             :         int         nelems;
     167             :         Datum      *datum_opts;
     168             :         int         i;
     169             : 
     170             :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     171             : 
     172         368 :         deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
     173             : 
     174         368 :         if (nelems % 2 != 0)
     175           0 :             ereport(ERROR,
     176             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     177             :                      errmsg("array must have even number of elements")));
     178             : 
     179        1116 :         for (i = 0; i < nelems; i += 2)
     180             :         {
     181         748 :             char       *optname = TextDatumGetCString(datum_opts[i]);
     182         748 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     183             : 
     184         748 :             options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
     185             :         }
     186             :     }
     187             : 
     188         422 :     InitMaterializedSRF(fcinfo, 0);
     189         422 :     p->tupstore = rsinfo->setResult;
     190         422 :     p->tupdesc = rsinfo->setDesc;
     191             : 
     192             :     /*
     193             :      * Compute the current end-of-wal.
     194             :      */
     195         422 :     if (!RecoveryInProgress())
     196         410 :         end_of_wal = GetFlushRecPtr(NULL);
     197             :     else
     198          12 :         end_of_wal = GetXLogReplayRecPtr(NULL);
     199             : 
     200         422 :     ReplicationSlotAcquire(NameStr(*name), true);
     201             : 
     202         420 :     PG_TRY();
     203             :     {
     204             :         /* restart at slot's confirmed_flush */
     205         826 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     206             :                                     options,
     207             :                                     false,
     208         420 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     209             :                                                .segment_open = wal_segment_open,
     210             :                                                .segment_close = wal_segment_close),
     211             :                                     LogicalOutputPrepareWrite,
     212             :                                     LogicalOutputWrite, NULL);
     213             : 
     214         406 :         MemoryContextSwitchTo(oldcontext);
     215             : 
     216             :         /*
     217             :          * Check whether the output plugin writes textual output if that's
     218             :          * what we need.
     219             :          */
     220         406 :         if (!binary &&
     221         382 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     222           2 :             ereport(ERROR,
     223             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     224             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     225             :                             NameStr(MyReplicationSlot->data.plugin),
     226             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     227             : 
     228             :         /*
     229             :          * Wait for specified streaming replication standby servers (if any)
     230             :          * to confirm receipt of WAL up to wait_for_wal_lsn.
     231             :          */
     232         404 :         if (XLogRecPtrIsInvalid(upto_lsn))
     233         404 :             wait_for_wal_lsn = end_of_wal;
     234             :         else
     235           0 :             wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
     236             : 
     237         404 :         WaitForStandbyConfirmation(wait_for_wal_lsn);
     238             : 
     239         404 :         ctx->output_writer_private = p;
     240             : 
     241             :         /*
     242             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     243             :          * xacts that committed after the slot's confirmed_flush can be
     244             :          * accumulated into reorder buffers.
     245             :          */
     246         404 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     247             : 
     248             :         /* invalidate non-timetravel entries */
     249         404 :         InvalidateSystemCaches();
     250             : 
     251             :         /* Decode until we run out of records */
     252     3297790 :         while (ctx->reader->EndRecPtr < end_of_wal)
     253             :         {
     254             :             XLogRecord *record;
     255     3297386 :             char       *errm = NULL;
     256             : 
     257     3297386 :             record = XLogReadRecord(ctx->reader, &errm);
     258     3297386 :             if (errm)
     259           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
     260             : 
     261             :             /*
     262             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     263             :              * store the description into our tuplestore.
     264             :              */
     265     3297386 :             if (record != NULL)
     266     3297386 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     267             : 
     268             :             /* check limits */
     269     3297386 :             if (upto_lsn != InvalidXLogRecPtr &&
     270           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     271           0 :                 break;
     272     3297386 :             if (upto_nchanges != 0 &&
     273           0 :                 upto_nchanges <= p->returned_rows)
     274           0 :                 break;
     275     3297386 :             CHECK_FOR_INTERRUPTS();
     276             :         }
     277             : 
     278             :         /*
     279             :          * Logical decoding could have clobbered CurrentResourceOwner during
     280             :          * transaction management, so restore the executor's value.  (This is
     281             :          * a kluge, but it's not worth cleaning up right now.)
     282             :          */
     283         404 :         CurrentResourceOwner = old_resowner;
     284             : 
     285             :         /*
     286             :          * Next time, start where we left off. (Hunting things, the family
     287             :          * business..)
     288             :          */
     289         404 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     290             :         {
     291         362 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     292             : 
     293             :             /*
     294             :              * If only the confirmed_flush_lsn has changed the slot won't get
     295             :              * marked as dirty by the above. Callers on the walsender
     296             :              * interface are expected to keep track of their own progress and
     297             :              * don't need it written out. But SQL-interface users cannot
     298             :              * specify their own start positions and it's harder for them to
     299             :              * keep track of their progress, so we should make more of an
     300             :              * effort to save it for them.
     301             :              *
     302             :              * Dirty the slot so it's written out at the next checkpoint.
     303             :              * We'll still lose its position on crash, as documented, but it's
     304             :              * better than always losing the position even on clean restart.
     305             :              */
     306         362 :             ReplicationSlotMarkDirty();
     307             :         }
     308             : 
     309             :         /* free context, call shutdown callback */
     310         404 :         FreeDecodingContext(ctx);
     311             : 
     312         404 :         ReplicationSlotRelease();
     313         404 :         InvalidateSystemCaches();
     314             :     }
     315          16 :     PG_CATCH();
     316             :     {
     317             :         /* clear all timetravel entries */
     318          16 :         InvalidateSystemCaches();
     319             : 
     320          16 :         PG_RE_THROW();
     321             :     }
     322         404 :     PG_END_TRY();
     323             : 
     324         404 :     return (Datum) 0;
     325             : }
     326             : 
     327             : /*
     328             :  * SQL function returning the changestream as text, consuming the data.
     329             :  */
     330             : Datum
     331         368 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     332             : {
     333         368 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     334             : }
     335             : 
     336             : /*
     337             :  * SQL function returning the changestream as text, only peeking ahead.
     338             :  */
     339             : Datum
     340          32 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     341             : {
     342          32 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     343             : }
     344             : 
     345             : /*
     346             :  * SQL function returning the changestream in binary, consuming the data.
     347             :  */
     348             : Datum
     349           8 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     350             : {
     351           8 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     352             : }
     353             : 
     354             : /*
     355             :  * SQL function returning the changestream in binary, only peeking ahead.
     356             :  */
     357             : Datum
     358          16 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     359             : {
     360          16 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     361             : }
     362             : 
     363             : 
     364             : /*
     365             :  * SQL function for writing logical decoding message into WAL.
     366             :  */
     367             : Datum
     368         228 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     369             : {
     370         228 :     bool        transactional = PG_GETARG_BOOL(0);
     371         228 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     372         228 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     373         228 :     bool        flush = PG_GETARG_BOOL(3);
     374             :     XLogRecPtr  lsn;
     375             : 
     376         228 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     377             :                             transactional, flush);
     378         228 :     PG_RETURN_LSN(lsn);
     379             : }
     380             : 
     381             : Datum
     382         228 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     383             : {
     384             :     /* bytea and text are compatible */
     385         228 :     return pg_logical_emit_message_bytea(fcinfo);
     386             : }

Generated by: LCOV version 1.14