LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15beta1 Lines: 96 111 86.5 %
Date: 2022-05-19 15:10:49 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2022, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logical/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xact.h"
      21             : #include "access/xlog_internal.h"
      22             : #include "access/xlogrecovery.h"
      23             : #include "access/xlogutils.h"
      24             : #include "catalog/pg_type.h"
      25             : #include "fmgr.h"
      26             : #include "funcapi.h"
      27             : #include "mb/pg_wchar.h"
      28             : #include "miscadmin.h"
      29             : #include "nodes/makefuncs.h"
      30             : #include "replication/decode.h"
      31             : #include "replication/logical.h"
      32             : #include "replication/message.h"
      33             : #include "storage/fd.h"
      34             : #include "utils/array.h"
      35             : #include "utils/builtins.h"
      36             : #include "utils/inval.h"
      37             : #include "utils/lsyscache.h"
      38             : #include "utils/memutils.h"
      39             : #include "utils/pg_lsn.h"
      40             : #include "utils/regproc.h"
      41             : #include "utils/resowner.h"
      42             : 
      43             : /* Private data for writing out data */
      44             : typedef struct DecodingOutputState
      45             : {
      46             :     Tuplestorestate *tupstore;
      47             :     TupleDesc   tupdesc;
      48             :     bool        binary_output;
      49             :     int64       returned_rows;
      50             : } DecodingOutputState;
      51             : 
      52             : /*
      53             :  * Prepare for an output plugin write.
      54             :  */
      55             : static void
      56      302036 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      57             :                           bool last_write)
      58             : {
      59      302036 :     resetStringInfo(ctx->out);
      60      302036 : }
      61             : 
      62             : /*
      63             :  * Perform output plugin write into tuplestore.
      64             :  */
      65             : static void
      66      302036 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      67             :                    bool last_write)
      68             : {
      69             :     Datum       values[3];
      70             :     bool        nulls[3];
      71             :     DecodingOutputState *p;
      72             : 
      73             :     /* SQL Datums can only be of a limited length... */
      74      302036 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      75           0 :         elog(ERROR, "too much output for sql interface");
      76             : 
      77      302036 :     p = (DecodingOutputState *) ctx->output_writer_private;
      78             : 
      79      302036 :     memset(nulls, 0, sizeof(nulls));
      80      302036 :     values[0] = LSNGetDatum(lsn);
      81      302036 :     values[1] = TransactionIdGetDatum(xid);
      82             : 
      83             :     /*
      84             :      * Assert ctx->out is in database encoding when we're writing textual
      85             :      * output.
      86             :      */
      87      302036 :     if (!p->binary_output)
      88             :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      89             :                                ctx->out->data, ctx->out->len,
      90             :                                false));
      91             : 
      92             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      93      302036 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      94             : 
      95      302036 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      96      302036 :     p->returned_rows++;
      97      302036 : }
      98             : 
      99             : /*
     100             :  * Helper function for the various SQL callable logical decoding functions.
     101             :  */
     102             : static Datum
     103         350 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     104             : {
     105             :     Name        name;
     106             :     XLogRecPtr  upto_lsn;
     107             :     int32       upto_nchanges;
     108         350 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     109             :     MemoryContext per_query_ctx;
     110             :     MemoryContext oldcontext;
     111             :     XLogRecPtr  end_of_wal;
     112             :     LogicalDecodingContext *ctx;
     113         350 :     ResourceOwner old_resowner = CurrentResourceOwner;
     114             :     ArrayType  *arr;
     115             :     Size        ndim;
     116         350 :     List       *options = NIL;
     117             :     DecodingOutputState *p;
     118             : 
     119         350 :     CheckSlotPermissions();
     120             : 
     121         348 :     CheckLogicalDecodingRequirements();
     122             : 
     123         348 :     if (PG_ARGISNULL(0))
     124           0 :         ereport(ERROR,
     125             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     126             :                  errmsg("slot name must not be null")));
     127         348 :     name = PG_GETARG_NAME(0);
     128             : 
     129         348 :     if (PG_ARGISNULL(1))
     130         348 :         upto_lsn = InvalidXLogRecPtr;
     131             :     else
     132           0 :         upto_lsn = PG_GETARG_LSN(1);
     133             : 
     134         348 :     if (PG_ARGISNULL(2))
     135         348 :         upto_nchanges = InvalidXLogRecPtr;
     136             :     else
     137           0 :         upto_nchanges = PG_GETARG_INT32(2);
     138             : 
     139         348 :     if (PG_ARGISNULL(3))
     140           0 :         ereport(ERROR,
     141             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     142             :                  errmsg("options array must not be null")));
     143         348 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     144             : 
     145             :     /* state to write output to */
     146         348 :     p = palloc0(sizeof(DecodingOutputState));
     147             : 
     148         348 :     p->binary_output = binary;
     149             : 
     150         348 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     151         348 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     152             : 
     153             :     /* Deconstruct options array */
     154         348 :     ndim = ARR_NDIM(arr);
     155         348 :     if (ndim > 1)
     156             :     {
     157           0 :         ereport(ERROR,
     158             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     159             :                  errmsg("array must be one-dimensional")));
     160             :     }
     161         348 :     else if (array_contains_nulls(arr))
     162             :     {
     163           0 :         ereport(ERROR,
     164             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     165             :                  errmsg("array must not contain nulls")));
     166             :     }
     167         348 :     else if (ndim == 1)
     168             :     {
     169             :         int         nelems;
     170             :         Datum      *datum_opts;
     171             :         int         i;
     172             : 
     173             :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     174             : 
     175         310 :         deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
     176             :                           &datum_opts, NULL, &nelems);
     177             : 
     178         310 :         if (nelems % 2 != 0)
     179           0 :             ereport(ERROR,
     180             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     181             :                      errmsg("array must have even number of elements")));
     182             : 
     183         930 :         for (i = 0; i < nelems; i += 2)
     184             :         {
     185         620 :             char       *name = TextDatumGetCString(datum_opts[i]);
     186         620 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     187             : 
     188         620 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
     189             :         }
     190             :     }
     191             : 
     192         348 :     SetSingleFuncCall(fcinfo, 0);
     193         348 :     p->tupstore = rsinfo->setResult;
     194         348 :     p->tupdesc = rsinfo->setDesc;
     195             : 
     196             :     /*
     197             :      * Compute the current end-of-wal.
     198             :      */
     199         348 :     if (!RecoveryInProgress())
     200         348 :         end_of_wal = GetFlushRecPtr(NULL);
     201             :     else
     202           0 :         end_of_wal = GetXLogReplayRecPtr(NULL);
     203             : 
     204         348 :     ReplicationSlotAcquire(NameStr(*name), true);
     205             : 
     206         346 :     PG_TRY();
     207             :     {
     208             :         /* restart at slot's confirmed_flush */
     209         682 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     210             :                                     options,
     211             :                                     false,
     212         346 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     213             :                                                .segment_open = wal_segment_open,
     214             :                                                .segment_close = wal_segment_close),
     215             :                                     LogicalOutputPrepareWrite,
     216             :                                     LogicalOutputWrite, NULL);
     217             : 
     218             :         /*
     219             :          * After the sanity checks in CreateDecodingContext, make sure the
     220             :          * restart_lsn is valid.  Avoid "cannot get changes" wording in this
     221             :          * errmsg because that'd be confusingly ambiguous about no changes
     222             :          * being available.
     223             :          */
     224         336 :         if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
     225           0 :             ereport(ERROR,
     226             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     227             :                      errmsg("can no longer get changes from replication slot \"%s\"",
     228             :                             NameStr(*name)),
     229             :                      errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
     230             : 
     231         336 :         MemoryContextSwitchTo(oldcontext);
     232             : 
     233             :         /*
     234             :          * Check whether the output plugin writes textual output if that's
     235             :          * what we need.
     236             :          */
     237         336 :         if (!binary &&
     238         322 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     239           2 :             ereport(ERROR,
     240             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     241             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     242             :                             NameStr(MyReplicationSlot->data.plugin),
     243             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     244             : 
     245         334 :         ctx->output_writer_private = p;
     246             : 
     247             :         /*
     248             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     249             :          * xacts that committed after the slot's confirmed_flush can be
     250             :          * accumulated into reorder buffers.
     251             :          */
     252         334 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     253             : 
     254             :         /* invalidate non-timetravel entries */
     255         334 :         InvalidateSystemCaches();
     256             : 
     257             :         /* Decode until we run out of records */
     258     2957922 :         while (ctx->reader->EndRecPtr < end_of_wal)
     259             :         {
     260             :             XLogRecord *record;
     261     2957588 :             char       *errm = NULL;
     262             : 
     263     2957588 :             record = XLogReadRecord(ctx->reader, &errm);
     264     2957588 :             if (errm)
     265           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
     266             : 
     267             :             /*
     268             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     269             :              * store the description into our tuplestore.
     270             :              */
     271     2957588 :             if (record != NULL)
     272     2957588 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     273             : 
     274             :             /* check limits */
     275     2957588 :             if (upto_lsn != InvalidXLogRecPtr &&
     276           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     277           0 :                 break;
     278     2957588 :             if (upto_nchanges != 0 &&
     279           0 :                 upto_nchanges <= p->returned_rows)
     280           0 :                 break;
     281     2957588 :             CHECK_FOR_INTERRUPTS();
     282             :         }
     283             : 
     284             :         /*
     285             :          * Logical decoding could have clobbered CurrentResourceOwner during
     286             :          * transaction management, so restore the executor's value.  (This is
     287             :          * a kluge, but it's not worth cleaning up right now.)
     288             :          */
     289         334 :         CurrentResourceOwner = old_resowner;
     290             : 
     291             :         /*
     292             :          * Next time, start where we left off. (Hunting things, the family
     293             :          * business..)
     294             :          */
     295         334 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     296             :         {
     297         310 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     298             : 
     299             :             /*
     300             :              * If only the confirmed_flush_lsn has changed the slot won't get
     301             :              * marked as dirty by the above. Callers on the walsender
     302             :              * interface are expected to keep track of their own progress and
     303             :              * don't need it written out. But SQL-interface users cannot
     304             :              * specify their own start positions and it's harder for them to
     305             :              * keep track of their progress, so we should make more of an
     306             :              * effort to save it for them.
     307             :              *
     308             :              * Dirty the slot so it's written out at the next checkpoint.
     309             :              * We'll still lose its position on crash, as documented, but it's
     310             :              * better than always losing the position even on clean restart.
     311             :              */
     312         310 :             ReplicationSlotMarkDirty();
     313             :         }
     314             : 
     315             :         /* free context, call shutdown callback */
     316         334 :         FreeDecodingContext(ctx);
     317             : 
     318         334 :         ReplicationSlotRelease();
     319         334 :         InvalidateSystemCaches();
     320             :     }
     321          12 :     PG_CATCH();
     322             :     {
     323             :         /* clear all timetravel entries */
     324          12 :         InvalidateSystemCaches();
     325             : 
     326          12 :         PG_RE_THROW();
     327             :     }
     328         334 :     PG_END_TRY();
     329             : 
     330         334 :     return (Datum) 0;
     331             : }
     332             : 
     333             : /*
     334             :  * SQL function returning the changestream as text, consuming the data.
     335             :  */
     336             : Datum
     337         314 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     338             : {
     339         314 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     340             : }
     341             : 
     342             : /*
     343             :  * SQL function returning the changestream as text, only peeking ahead.
     344             :  */
     345             : Datum
     346          22 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     347             : {
     348          22 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     349             : }
     350             : 
     351             : /*
     352             :  * SQL function returning the changestream in binary, consuming the data.
     353             :  */
     354             : Datum
     355           8 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     356             : {
     357           8 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     358             : }
     359             : 
     360             : /*
     361             :  * SQL function returning the changestream in binary, only peeking ahead.
     362             :  */
     363             : Datum
     364           6 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     365             : {
     366           6 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     367             : }
     368             : 
     369             : 
     370             : /*
     371             :  * SQL function for writing logical decoding message into WAL.
     372             :  */
     373             : Datum
     374          44 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     375             : {
     376          44 :     bool        transactional = PG_GETARG_BOOL(0);
     377          44 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     378          44 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     379             :     XLogRecPtr  lsn;
     380             : 
     381          44 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     382             :                             transactional);
     383          44 :     PG_RETURN_LSN(lsn);
     384             : }
     385             : 
     386             : Datum
     387          44 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     388             : {
     389             :     /* bytea and text are compatible */
     390          44 :     return pg_logical_emit_message_bytea(fcinfo);
     391             : }

Generated by: LCOV version 1.14