|           Line data    Source code 
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logical/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xlogrecovery.h"
      21             : #include "access/xlogutils.h"
      22             : #include "catalog/pg_type.h"
      23             : #include "fmgr.h"
      24             : #include "funcapi.h"
      25             : #include "mb/pg_wchar.h"
      26             : #include "miscadmin.h"
      27             : #include "nodes/makefuncs.h"
      28             : #include "replication/decode.h"
      29             : #include "replication/logical.h"
      30             : #include "replication/message.h"
      31             : #include "utils/array.h"
      32             : #include "utils/builtins.h"
      33             : #include "utils/inval.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/pg_lsn.h"
      36             : #include "utils/regproc.h"
      37             : #include "utils/resowner.h"
      38             : 
      39             : /* Private data for writing out data */
      40             : typedef struct DecodingOutputState
      41             : {
      42             :     Tuplestorestate *tupstore;
      43             :     TupleDesc   tupdesc;
      44             :     bool        binary_output;
      45             :     int64       returned_rows;
      46             : } DecodingOutputState;
      47             : 
      48             : /*
      49             :  * Prepare for an output plugin write.
      50             :  */
      51             : static void
      52      302428 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      53             :                           bool last_write)
      54             : {
      55      302428 :     resetStringInfo(ctx->out);
      56      302428 : }
      57             : 
      58             : /*
      59             :  * Perform output plugin write into tuplestore.
      60             :  */
      61             : static void
      62      302428 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      63             :                    bool last_write)
      64             : {
      65             :     Datum       values[3];
      66             :     bool        nulls[3];
      67             :     DecodingOutputState *p;
      68             : 
      69             :     /* SQL Datums can only be of a limited length... */
      70      302428 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      71           0 :         elog(ERROR, "too much output for sql interface");
      72             : 
      73      302428 :     p = (DecodingOutputState *) ctx->output_writer_private;
      74             : 
      75      302428 :     memset(nulls, 0, sizeof(nulls));
      76      302428 :     values[0] = LSNGetDatum(lsn);
      77      302428 :     values[1] = TransactionIdGetDatum(xid);
      78             : 
      79             :     /*
      80             :      * Assert ctx->out is in database encoding when we're writing textual
      81             :      * output.
      82             :      */
      83      302428 :     if (!p->binary_output)
      84             :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      85             :                                ctx->out->data, ctx->out->len,
      86             :                                false));
      87             : 
      88             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      89      302428 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      90             : 
      91      302428 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      92      302428 :     p->returned_rows++;
      93      302428 : }
      94             : 
      95             : /*
      96             :  * Helper function for the various SQL callable logical decoding functions.
      97             :  */
      98             : static Datum
      99         432 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     100             : {
     101             :     Name        name;
     102             :     XLogRecPtr  upto_lsn;
     103             :     int32       upto_nchanges;
     104         432 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     105             :     MemoryContext per_query_ctx;
     106             :     MemoryContext oldcontext;
     107             :     XLogRecPtr  end_of_wal;
     108             :     XLogRecPtr  wait_for_wal_lsn;
     109             :     LogicalDecodingContext *ctx;
     110         432 :     ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
     111             :     ArrayType  *arr;
     112             :     Size        ndim;
     113         432 :     List       *options = NIL;
     114             :     DecodingOutputState *p;
     115             : 
     116         432 :     CheckSlotPermissions();
     117             : 
     118         430 :     CheckLogicalDecodingRequirements();
     119             : 
     120         430 :     if (PG_ARGISNULL(0))
     121           0 :         ereport(ERROR,
     122             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     123             :                  errmsg("slot name must not be null")));
     124         430 :     name = PG_GETARG_NAME(0);
     125             : 
     126         430 :     if (PG_ARGISNULL(1))
     127         430 :         upto_lsn = InvalidXLogRecPtr;
     128             :     else
     129           0 :         upto_lsn = PG_GETARG_LSN(1);
     130             : 
     131         430 :     if (PG_ARGISNULL(2))
     132         430 :         upto_nchanges = InvalidXLogRecPtr;
     133             :     else
     134           0 :         upto_nchanges = PG_GETARG_INT32(2);
     135             : 
     136         430 :     if (PG_ARGISNULL(3))
     137           0 :         ereport(ERROR,
     138             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     139             :                  errmsg("options array must not be null")));
     140         430 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     141             : 
     142             :     /* state to write output to */
     143         430 :     p = palloc0(sizeof(DecodingOutputState));
     144             : 
     145         430 :     p->binary_output = binary;
     146             : 
     147         430 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     148         430 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     149             : 
     150             :     /* Deconstruct options array */
     151         430 :     ndim = ARR_NDIM(arr);
     152         430 :     if (ndim > 1)
     153             :     {
     154           0 :         ereport(ERROR,
     155             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     156             :                  errmsg("array must be one-dimensional")));
     157             :     }
     158         430 :     else if (array_contains_nulls(arr))
     159             :     {
     160           0 :         ereport(ERROR,
     161             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     162             :                  errmsg("array must not contain nulls")));
     163             :     }
     164         430 :     else if (ndim == 1)
     165             :     {
     166             :         int         nelems;
     167             :         Datum      *datum_opts;
     168             :         int         i;
     169             : 
     170             :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     171             : 
     172         372 :         deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
     173             : 
     174         372 :         if (nelems % 2 != 0)
     175           0 :             ereport(ERROR,
     176             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     177             :                      errmsg("array must have even number of elements")));
     178             : 
     179        1128 :         for (i = 0; i < nelems; i += 2)
     180             :         {
     181         756 :             char       *optname = TextDatumGetCString(datum_opts[i]);
     182         756 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     183             : 
     184         756 :             options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
     185             :         }
     186             :     }
     187             : 
     188         430 :     InitMaterializedSRF(fcinfo, 0);
     189         430 :     p->tupstore = rsinfo->setResult;
     190         430 :     p->tupdesc = rsinfo->setDesc;
     191             : 
     192             :     /*
     193             :      * Compute the current end-of-wal.
     194             :      */
     195         430 :     if (!RecoveryInProgress())
     196         418 :         end_of_wal = GetFlushRecPtr(NULL);
     197             :     else
     198          12 :         end_of_wal = GetXLogReplayRecPtr(NULL);
     199             : 
     200         430 :     ReplicationSlotAcquire(NameStr(*name), true, true);
     201             : 
     202         428 :     PG_TRY();
     203             :     {
     204             :         /* restart at slot's confirmed_flush */
     205         842 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     206             :                                     options,
     207             :                                     false,
     208         428 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     209             :                                                .segment_open = wal_segment_open,
     210             :                                                .segment_close = wal_segment_close),
     211             :                                     LogicalOutputPrepareWrite,
     212             :                                     LogicalOutputWrite, NULL);
     213             : 
     214         414 :         MemoryContextSwitchTo(oldcontext);
     215             : 
     216             :         /*
     217             :          * Check whether the output plugin writes textual output if that's
     218             :          * what we need.
     219             :          */
     220         414 :         if (!binary &&
     221         386 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     222           2 :             ereport(ERROR,
     223             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     224             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     225             :                             NameStr(MyReplicationSlot->data.plugin),
     226             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     227             : 
     228             :         /*
     229             :          * Wait for specified streaming replication standby servers (if any)
     230             :          * to confirm receipt of WAL up to wait_for_wal_lsn.
     231             :          */
     232         412 :         if (XLogRecPtrIsInvalid(upto_lsn))
     233         412 :             wait_for_wal_lsn = end_of_wal;
     234             :         else
     235           0 :             wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
     236             : 
     237         412 :         WaitForStandbyConfirmation(wait_for_wal_lsn);
     238             : 
     239         412 :         ctx->output_writer_private = p;
     240             : 
     241             :         /*
     242             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     243             :          * xacts that committed after the slot's confirmed_flush can be
     244             :          * accumulated into reorder buffers.
     245             :          */
     246         412 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     247             : 
     248             :         /* invalidate non-timetravel entries */
     249         412 :         InvalidateSystemCaches();
     250             : 
     251             :         /* Decode until we run out of records */
     252     3308570 :         while (ctx->reader->EndRecPtr < end_of_wal)
     253             :         {
     254             :             XLogRecord *record;
     255     3308158 :             char       *errm = NULL;
     256             : 
     257     3308158 :             record = XLogReadRecord(ctx->reader, &errm);
     258     3308158 :             if (errm)
     259           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
     260             : 
     261             :             /*
     262             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     263             :              * store the description into our tuplestore.
     264             :              */
     265     3308158 :             if (record != NULL)
     266             :             {
     267     3308158 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     268             : 
     269             :                 /*
     270             :                  * We used to have bugs where logical decoding would fail to
     271             :                  * preserve the resource owner.  Verify that that doesn't
     272             :                  * happen anymore.  XXX this could be removed once it's been
     273             :                  * battle-tested.
     274             :                  */
     275             :                 Assert(CurrentResourceOwner == old_resowner);
     276             :             }
     277             : 
     278             :             /* check limits */
     279     3308158 :             if (upto_lsn != InvalidXLogRecPtr &&
     280           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     281           0 :                 break;
     282     3308158 :             if (upto_nchanges != 0 &&
     283           0 :                 upto_nchanges <= p->returned_rows)
     284           0 :                 break;
     285     3308158 :             CHECK_FOR_INTERRUPTS();
     286             :         }
     287             : 
     288             :         /*
     289             :          * Next time, start where we left off. (Hunting things, the family
     290             :          * business..)
     291             :          */
     292         412 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     293             :         {
     294         370 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     295             : 
     296             :             /*
     297             :              * If only the confirmed_flush_lsn has changed the slot won't get
     298             :              * marked as dirty by the above. Callers on the walsender
     299             :              * interface are expected to keep track of their own progress and
     300             :              * don't need it written out. But SQL-interface users cannot
     301             :              * specify their own start positions and it's harder for them to
     302             :              * keep track of their progress, so we should make more of an
     303             :              * effort to save it for them.
     304             :              *
     305             :              * Dirty the slot so it's written out at the next checkpoint.
     306             :              * We'll still lose its position on crash, as documented, but it's
     307             :              * better than always losing the position even on clean restart.
     308             :              */
     309         370 :             ReplicationSlotMarkDirty();
     310             :         }
     311             : 
     312             :         /* free context, call shutdown callback */
     313         412 :         FreeDecodingContext(ctx);
     314             : 
     315         412 :         ReplicationSlotRelease();
     316         412 :         InvalidateSystemCaches();
     317             :     }
     318          16 :     PG_CATCH();
     319             :     {
     320             :         /* clear all timetravel entries */
     321          16 :         InvalidateSystemCaches();
     322             : 
     323          16 :         PG_RE_THROW();
     324             :     }
     325         412 :     PG_END_TRY();
     326             : 
     327         412 :     return (Datum) 0;
     328             : }
     329             : 
     330             : /*
     331             :  * SQL function returning the changestream as text, consuming the data.
     332             :  */
     333             : Datum
     334         372 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     335             : {
     336         372 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     337             : }
     338             : 
     339             : /*
     340             :  * SQL function returning the changestream as text, only peeking ahead.
     341             :  */
     342             : Datum
     343          32 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     344             : {
     345          32 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     346             : }
     347             : 
     348             : /*
     349             :  * SQL function returning the changestream in binary, consuming the data.
     350             :  */
     351             : Datum
     352          12 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     353             : {
     354          12 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     355             : }
     356             : 
     357             : /*
     358             :  * SQL function returning the changestream in binary, only peeking ahead.
     359             :  */
     360             : Datum
     361          16 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     362             : {
     363          16 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     364             : }
     365             : 
     366             : 
     367             : /*
     368             :  * SQL function for writing logical decoding message into WAL.
     369             :  */
     370             : Datum
     371         390 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     372             : {
     373         390 :     bool        transactional = PG_GETARG_BOOL(0);
     374         390 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     375         390 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     376         390 :     bool        flush = PG_GETARG_BOOL(3);
     377             :     XLogRecPtr  lsn;
     378             : 
     379         390 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     380             :                             transactional, flush);
     381         390 :     PG_RETURN_LSN(lsn);
     382             : }
     383             : 
     384             : Datum
     385         390 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     386             : {
     387             :     /* bytea and text are compatible */
     388         390 :     return pg_logical_emit_message_bytea(fcinfo);
     389             : }
 |