LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.6 % 113 99
Test Date: 2026-03-04 15:14:37 Functions: 100.0 % 9 9
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * logicalfuncs.c
       4              :  *
       5              :  *     Support functions for using logical decoding and management of
       6              :  *     logical replication slots via SQL.
       7              :  *
       8              :  *
       9              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/replication/logical/logicalfuncs.c
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres.h"
      17              : 
      18              : #include <unistd.h>
      19              : 
      20              : #include "access/xlogrecovery.h"
      21              : #include "access/xlogutils.h"
      22              : #include "catalog/pg_type.h"
      23              : #include "fmgr.h"
      24              : #include "funcapi.h"
      25              : #include "mb/pg_wchar.h"
      26              : #include "miscadmin.h"
      27              : #include "nodes/makefuncs.h"
      28              : #include "replication/decode.h"
      29              : #include "replication/logical.h"
      30              : #include "replication/message.h"
      31              : #include "utils/array.h"
      32              : #include "utils/builtins.h"
      33              : #include "utils/inval.h"
      34              : #include "utils/memutils.h"
      35              : #include "utils/pg_lsn.h"
      36              : #include "utils/regproc.h"
      37              : #include "utils/resowner.h"
      38              : 
      39              : /* Private data for writing out data */
      40              : typedef struct DecodingOutputState
      41              : {
      42              :     Tuplestorestate *tupstore;
      43              :     TupleDesc   tupdesc;
      44              :     bool        binary_output;
      45              :     int64       returned_rows;
      46              : } DecodingOutputState;
      47              : 
      48              : /*
      49              :  * Prepare for an output plugin write.
      50              :  */
      51              : static void
      52       151214 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      53              :                           bool last_write)
      54              : {
      55       151214 :     resetStringInfo(ctx->out);
      56       151214 : }
      57              : 
      58              : /*
      59              :  * Perform output plugin write into tuplestore.
      60              :  */
      61              : static void
      62       151214 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      63              :                    bool last_write)
      64              : {
      65              :     Datum       values[3];
      66              :     bool        nulls[3];
      67              :     DecodingOutputState *p;
      68              : 
      69              :     /* SQL Datums can only be of a limited length... */
      70       151214 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      71            0 :         elog(ERROR, "too much output for sql interface");
      72              : 
      73       151214 :     p = (DecodingOutputState *) ctx->output_writer_private;
      74              : 
      75       151214 :     memset(nulls, 0, sizeof(nulls));
      76       151214 :     values[0] = LSNGetDatum(lsn);
      77       151214 :     values[1] = TransactionIdGetDatum(xid);
      78              : 
      79              :     /*
      80              :      * Assert ctx->out is in database encoding when we're writing textual
      81              :      * output.
      82              :      */
      83       151214 :     if (!p->binary_output)
      84              :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      85              :                                ctx->out->data, ctx->out->len,
      86              :                                false));
      87              : 
      88              :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      89       151214 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      90              : 
      91       151214 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      92       151214 :     p->returned_rows++;
      93       151214 : }
      94              : 
      95              : /*
      96              :  * Helper function for the various SQL callable logical decoding functions.
      97              :  */
      98              : static Datum
      99          221 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     100              : {
     101              :     Name        name;
     102              :     XLogRecPtr  upto_lsn;
     103              :     int32       upto_nchanges;
     104          221 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     105              :     MemoryContext per_query_ctx;
     106              :     MemoryContext oldcontext;
     107              :     XLogRecPtr  end_of_wal;
     108              :     XLogRecPtr  wait_for_wal_lsn;
     109              :     LogicalDecodingContext *ctx;
     110          221 :     ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
     111              :     ArrayType  *arr;
     112              :     Size        ndim;
     113          221 :     List       *options = NIL;
     114              :     DecodingOutputState *p;
     115              : 
     116          221 :     CheckSlotPermissions();
     117              : 
     118          220 :     CheckLogicalDecodingRequirements();
     119              : 
     120          219 :     if (PG_ARGISNULL(0))
     121            0 :         ereport(ERROR,
     122              :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     123              :                  errmsg("slot name must not be null")));
     124          219 :     name = PG_GETARG_NAME(0);
     125              : 
     126          219 :     if (PG_ARGISNULL(1))
     127          219 :         upto_lsn = InvalidXLogRecPtr;
     128              :     else
     129            0 :         upto_lsn = PG_GETARG_LSN(1);
     130              : 
     131          219 :     if (PG_ARGISNULL(2))
     132          219 :         upto_nchanges = 0;
     133              :     else
     134            0 :         upto_nchanges = PG_GETARG_INT32(2);
     135              : 
     136          219 :     if (PG_ARGISNULL(3))
     137            0 :         ereport(ERROR,
     138              :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     139              :                  errmsg("options array must not be null")));
     140          219 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     141              : 
     142              :     /* state to write output to */
     143          219 :     p = palloc0_object(DecodingOutputState);
     144              : 
     145          219 :     p->binary_output = binary;
     146              : 
     147          219 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     148          219 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     149              : 
     150              :     /* Deconstruct options array */
     151          219 :     ndim = ARR_NDIM(arr);
     152          219 :     if (ndim > 1)
     153              :     {
     154            0 :         ereport(ERROR,
     155              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     156              :                  errmsg("array must be one-dimensional")));
     157              :     }
     158          219 :     else if (array_contains_nulls(arr))
     159              :     {
     160            0 :         ereport(ERROR,
     161              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     162              :                  errmsg("array must not contain nulls")));
     163              :     }
     164          219 :     else if (ndim == 1)
     165              :     {
     166              :         int         nelems;
     167              :         Datum      *datum_opts;
     168              :         int         i;
     169              : 
     170              :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     171              : 
     172          189 :         deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
     173              : 
     174          189 :         if (nelems % 2 != 0)
     175            0 :             ereport(ERROR,
     176              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     177              :                      errmsg("array must have even number of elements")));
     178              : 
     179          573 :         for (i = 0; i < nelems; i += 2)
     180              :         {
     181          384 :             char       *optname = TextDatumGetCString(datum_opts[i]);
     182          384 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     183              : 
     184          384 :             options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
     185              :         }
     186              :     }
     187              : 
     188          219 :     InitMaterializedSRF(fcinfo, 0);
     189          219 :     p->tupstore = rsinfo->setResult;
     190          219 :     p->tupdesc = rsinfo->setDesc;
     191              : 
     192              :     /*
     193              :      * Compute the current end-of-wal.
     194              :      */
     195          219 :     if (!RecoveryInProgress())
     196          213 :         end_of_wal = GetFlushRecPtr(NULL);
     197              :     else
     198            6 :         end_of_wal = GetXLogReplayRecPtr(NULL);
     199              : 
     200          219 :     ReplicationSlotAcquire(NameStr(*name), true, true);
     201              : 
     202          218 :     PG_TRY();
     203              :     {
     204              :         /* restart at slot's confirmed_flush */
     205          429 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     206              :                                     options,
     207              :                                     false,
     208          218 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     209              :                                                .segment_open = wal_segment_open,
     210              :                                                .segment_close = wal_segment_close),
     211              :                                     LogicalOutputPrepareWrite,
     212              :                                     LogicalOutputWrite, NULL);
     213              : 
     214          211 :         MemoryContextSwitchTo(oldcontext);
     215              : 
     216              :         /*
     217              :          * Check whether the output plugin writes textual output if that's
     218              :          * what we need.
     219              :          */
     220          211 :         if (!binary &&
     221          194 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     222            1 :             ereport(ERROR,
     223              :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     224              :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     225              :                             NameStr(MyReplicationSlot->data.plugin),
     226              :                             format_procedure(fcinfo->flinfo->fn_oid))));
     227              : 
     228              :         /*
     229              :          * Wait for specified streaming replication standby servers (if any)
     230              :          * to confirm receipt of WAL up to wait_for_wal_lsn.
     231              :          */
     232          210 :         if (!XLogRecPtrIsValid(upto_lsn))
     233          210 :             wait_for_wal_lsn = end_of_wal;
     234              :         else
     235            0 :             wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
     236              : 
     237          210 :         WaitForStandbyConfirmation(wait_for_wal_lsn);
     238              : 
     239          210 :         ctx->output_writer_private = p;
     240              : 
     241              :         /*
     242              :          * Decoding of WAL must start at restart_lsn so that the entirety of
     243              :          * xacts that committed after the slot's confirmed_flush can be
     244              :          * accumulated into reorder buffers.
     245              :          */
     246          210 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     247              : 
     248              :         /* invalidate non-timetravel entries */
     249          210 :         InvalidateSystemCaches();
     250              : 
     251              :         /* Decode until we run out of records */
     252      1494128 :         while (ctx->reader->EndRecPtr < end_of_wal)
     253              :         {
     254              :             XLogRecord *record;
     255      1493918 :             char       *errm = NULL;
     256              : 
     257      1493918 :             record = XLogReadRecord(ctx->reader, &errm);
     258      1493918 :             if (errm)
     259            0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
     260              : 
     261              :             /*
     262              :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     263              :              * store the description into our tuplestore.
     264              :              */
     265      1493918 :             if (record != NULL)
     266              :             {
     267      1493918 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     268              : 
     269              :                 /*
     270              :                  * We used to have bugs where logical decoding would fail to
     271              :                  * preserve the resource owner.  Verify that that doesn't
     272              :                  * happen anymore.  XXX this could be removed once it's been
     273              :                  * battle-tested.
     274              :                  */
     275              :                 Assert(CurrentResourceOwner == old_resowner);
     276              :             }
     277              : 
     278              :             /* check limits */
     279      1493918 :             if (XLogRecPtrIsValid(upto_lsn) &&
     280            0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     281            0 :                 break;
     282      1493918 :             if (upto_nchanges != 0 &&
     283            0 :                 upto_nchanges <= p->returned_rows)
     284            0 :                 break;
     285      1493918 :             CHECK_FOR_INTERRUPTS();
     286              :         }
     287              : 
     288              :         /*
     289              :          * Next time, start where we left off. (Hunting things, the family
     290              :          * business..)
     291              :          */
     292          210 :         if (XLogRecPtrIsValid(ctx->reader->EndRecPtr) && confirm)
     293              :         {
     294          189 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     295              : 
     296              :             /*
     297              :              * If only the confirmed_flush_lsn has changed the slot won't get
     298              :              * marked as dirty by the above. Callers on the walsender
     299              :              * interface are expected to keep track of their own progress and
     300              :              * don't need it written out. But SQL-interface users cannot
     301              :              * specify their own start positions and it's harder for them to
     302              :              * keep track of their progress, so we should make more of an
     303              :              * effort to save it for them.
     304              :              *
     305              :              * Dirty the slot so it's written out at the next checkpoint.
     306              :              * We'll still lose its position on crash, as documented, but it's
     307              :              * better than always losing the position even on clean restart.
     308              :              */
     309          189 :             ReplicationSlotMarkDirty();
     310              :         }
     311              : 
     312              :         /* free context, call shutdown callback */
     313          210 :         FreeDecodingContext(ctx);
     314              : 
     315          210 :         ReplicationSlotRelease();
     316          210 :         InvalidateSystemCaches();
     317              :     }
     318            8 :     PG_CATCH();
     319              :     {
     320              :         /* clear all timetravel entries */
     321            8 :         InvalidateSystemCaches();
     322              : 
     323            8 :         PG_RE_THROW();
     324              :     }
     325          210 :     PG_END_TRY();
     326              : 
     327          210 :     return (Datum) 0;
     328              : }
     329              : 
     330              : /*
     331              :  * SQL function returning the changestream as text, consuming the data.
     332              :  */
     333              : Datum
     334          188 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     335              : {
     336          188 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     337              : }
     338              : 
     339              : /*
     340              :  * SQL function returning the changestream as text, only peeking ahead.
     341              :  */
     342              : Datum
     343           16 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     344              : {
     345           16 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     346              : }
     347              : 
     348              : /*
     349              :  * SQL function returning the changestream in binary, consuming the data.
     350              :  */
     351              : Datum
     352            9 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     353              : {
     354            9 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     355              : }
     356              : 
     357              : /*
     358              :  * SQL function returning the changestream in binary, only peeking ahead.
     359              :  */
     360              : Datum
     361            8 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     362              : {
     363            8 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     364              : }
     365              : 
     366              : 
     367              : /*
     368              :  * SQL function for writing logical decoding message into WAL.
     369              :  */
     370              : Datum
     371          204 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     372              : {
     373          204 :     bool        transactional = PG_GETARG_BOOL(0);
     374          204 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     375          204 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     376          204 :     bool        flush = PG_GETARG_BOOL(3);
     377              :     XLogRecPtr  lsn;
     378              : 
     379          204 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     380              :                             transactional, flush);
     381          204 :     PG_RETURN_LSN(lsn);
     382              : }
     383              : 
     384              : Datum
     385          204 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     386              : {
     387              :     /* bytea and text are compatible */
     388          204 :     return pg_logical_emit_message_bytea(fcinfo);
     389              : }
        

Generated by: LCOV version 2.0-1