LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 100 118 84.7 %
Date: 2021-12-09 04:09:06 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xact.h"
      21             : #include "access/xlog_internal.h"
      22             : #include "access/xlogutils.h"
      23             : #include "catalog/pg_type.h"
      24             : #include "fmgr.h"
      25             : #include "funcapi.h"
      26             : #include "mb/pg_wchar.h"
      27             : #include "miscadmin.h"
      28             : #include "nodes/makefuncs.h"
      29             : #include "replication/decode.h"
      30             : #include "replication/logical.h"
      31             : #include "replication/message.h"
      32             : #include "storage/fd.h"
      33             : #include "utils/array.h"
      34             : #include "utils/builtins.h"
      35             : #include "utils/inval.h"
      36             : #include "utils/lsyscache.h"
      37             : #include "utils/memutils.h"
      38             : #include "utils/pg_lsn.h"
      39             : #include "utils/regproc.h"
      40             : #include "utils/resowner.h"
      41             : 
      42             : /* private date for writing out data */
      43             : typedef struct DecodingOutputState
      44             : {
      45             :     Tuplestorestate *tupstore;
      46             :     TupleDesc   tupdesc;
      47             :     bool        binary_output;
      48             :     int64       returned_rows;
      49             : } DecodingOutputState;
      50             : 
      51             : /*
      52             :  * Prepare for an output plugin write.
      53             :  */
      54             : static void
      55      301960 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      56             :                           bool last_write)
      57             : {
      58      301960 :     resetStringInfo(ctx->out);
      59      301960 : }
      60             : 
      61             : /*
      62             :  * Perform output plugin write into tuplestore.
      63             :  */
      64             : static void
      65      301960 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      66             :                    bool last_write)
      67             : {
      68             :     Datum       values[3];
      69             :     bool        nulls[3];
      70             :     DecodingOutputState *p;
      71             : 
      72             :     /* SQL Datums can only be of a limited length... */
      73      301960 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      74           0 :         elog(ERROR, "too much output for sql interface");
      75             : 
      76      301960 :     p = (DecodingOutputState *) ctx->output_writer_private;
      77             : 
      78      301960 :     memset(nulls, 0, sizeof(nulls));
      79      301960 :     values[0] = LSNGetDatum(lsn);
      80      301960 :     values[1] = TransactionIdGetDatum(xid);
      81             : 
      82             :     /*
      83             :      * Assert ctx->out is in database encoding when we're writing textual
      84             :      * output.
      85             :      */
      86      301960 :     if (!p->binary_output)
      87             :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      88             :                                ctx->out->data, ctx->out->len,
      89             :                                false));
      90             : 
      91             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      92      301960 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      93             : 
      94      301960 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      95      301960 :     p->returned_rows++;
      96      301960 : }
      97             : 
      98             : /*
      99             :  * Helper function for the various SQL callable logical decoding functions.
     100             :  */
     101             : static Datum
     102         342 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     103             : {
     104             :     Name        name;
     105             :     XLogRecPtr  upto_lsn;
     106             :     int32       upto_nchanges;
     107         342 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     108             :     MemoryContext per_query_ctx;
     109             :     MemoryContext oldcontext;
     110             :     XLogRecPtr  end_of_wal;
     111             :     LogicalDecodingContext *ctx;
     112         342 :     ResourceOwner old_resowner = CurrentResourceOwner;
     113             :     ArrayType  *arr;
     114             :     Size        ndim;
     115         342 :     List       *options = NIL;
     116             :     DecodingOutputState *p;
     117             : 
     118         342 :     CheckSlotPermissions();
     119             : 
     120         340 :     CheckLogicalDecodingRequirements();
     121             : 
     122         340 :     if (PG_ARGISNULL(0))
     123           0 :         ereport(ERROR,
     124             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     125             :                  errmsg("slot name must not be null")));
     126         340 :     name = PG_GETARG_NAME(0);
     127             : 
     128         340 :     if (PG_ARGISNULL(1))
     129         340 :         upto_lsn = InvalidXLogRecPtr;
     130             :     else
     131           0 :         upto_lsn = PG_GETARG_LSN(1);
     132             : 
     133         340 :     if (PG_ARGISNULL(2))
     134         340 :         upto_nchanges = InvalidXLogRecPtr;
     135             :     else
     136           0 :         upto_nchanges = PG_GETARG_INT32(2);
     137             : 
     138         340 :     if (PG_ARGISNULL(3))
     139           0 :         ereport(ERROR,
     140             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     141             :                  errmsg("options array must not be null")));
     142         340 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     143             : 
     144             :     /* check to see if caller supports us returning a tuplestore */
     145         340 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     146           0 :         ereport(ERROR,
     147             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     148             :                  errmsg("set-valued function called in context that cannot accept a set")));
     149         340 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     150           0 :         ereport(ERROR,
     151             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     152             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     153             : 
     154             :     /* state to write output to */
     155         340 :     p = palloc0(sizeof(DecodingOutputState));
     156             : 
     157         340 :     p->binary_output = binary;
     158             : 
     159             :     /* Build a tuple descriptor for our result type */
     160         340 :     if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
     161           0 :         elog(ERROR, "return type must be a row type");
     162             : 
     163         340 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     164         340 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     165             : 
     166             :     /* Deconstruct options array */
     167         340 :     ndim = ARR_NDIM(arr);
     168         340 :     if (ndim > 1)
     169             :     {
     170           0 :         ereport(ERROR,
     171             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     172             :                  errmsg("array must be one-dimensional")));
     173             :     }
     174         340 :     else if (array_contains_nulls(arr))
     175             :     {
     176           0 :         ereport(ERROR,
     177             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     178             :                  errmsg("array must not contain nulls")));
     179             :     }
     180         340 :     else if (ndim == 1)
     181             :     {
     182             :         int         nelems;
     183             :         Datum      *datum_opts;
     184             :         int         i;
     185             : 
     186             :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     187             : 
     188         302 :         deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
     189             :                           &datum_opts, NULL, &nelems);
     190             : 
     191         302 :         if (nelems % 2 != 0)
     192           0 :             ereport(ERROR,
     193             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     194             :                      errmsg("array must have even number of elements")));
     195             : 
     196         910 :         for (i = 0; i < nelems; i += 2)
     197             :         {
     198         608 :             char       *name = TextDatumGetCString(datum_opts[i]);
     199         608 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     200             : 
     201         608 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
     202             :         }
     203             :     }
     204             : 
     205         340 :     p->tupstore = tuplestore_begin_heap(true, false, work_mem);
     206         340 :     rsinfo->returnMode = SFRM_Materialize;
     207         340 :     rsinfo->setResult = p->tupstore;
     208         340 :     rsinfo->setDesc = p->tupdesc;
     209             : 
     210             :     /*
     211             :      * Compute the current end-of-wal.
     212             :      */
     213         340 :     if (!RecoveryInProgress())
     214         340 :         end_of_wal = GetFlushRecPtr(NULL);
     215             :     else
     216           0 :         end_of_wal = GetXLogReplayRecPtr(NULL);
     217             : 
     218         340 :     ReplicationSlotAcquire(NameStr(*name), true);
     219             : 
     220         338 :     PG_TRY();
     221             :     {
     222             :         /* restart at slot's confirmed_flush */
     223         666 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     224             :                                     options,
     225             :                                     false,
     226         338 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     227             :                                                .segment_open = wal_segment_open,
     228             :                                                .segment_close = wal_segment_close),
     229             :                                     LogicalOutputPrepareWrite,
     230             :                                     LogicalOutputWrite, NULL);
     231             : 
     232             :         /*
     233             :          * After the sanity checks in CreateDecodingContext, make sure the
     234             :          * restart_lsn is valid.  Avoid "cannot get changes" wording in this
     235             :          * errmsg because that'd be confusingly ambiguous about no changes
     236             :          * being available.
     237             :          */
     238         328 :         if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
     239           0 :             ereport(ERROR,
     240             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     241             :                      errmsg("can no longer get changes from replication slot \"%s\"",
     242             :                             NameStr(*name)),
     243             :                      errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
     244             : 
     245         328 :         MemoryContextSwitchTo(oldcontext);
     246             : 
     247             :         /*
     248             :          * Check whether the output plugin writes textual output if that's
     249             :          * what we need.
     250             :          */
     251         328 :         if (!binary &&
     252         314 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     253           2 :             ereport(ERROR,
     254             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     255             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     256             :                             NameStr(MyReplicationSlot->data.plugin),
     257             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     258             : 
     259         326 :         ctx->output_writer_private = p;
     260             : 
     261             :         /*
     262             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     263             :          * xacts that committed after the slot's confirmed_flush can be
     264             :          * accumulated into reorder buffers.
     265             :          */
     266         326 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     267             : 
     268             :         /* invalidate non-timetravel entries */
     269         326 :         InvalidateSystemCaches();
     270             : 
     271             :         /* Decode until we run out of records */
     272     3272182 :         while (ctx->reader->EndRecPtr < end_of_wal)
     273             :         {
     274             :             XLogRecord *record;
     275     3271856 :             char       *errm = NULL;
     276             : 
     277     3271856 :             record = XLogReadRecord(ctx->reader, &errm);
     278     3271856 :             if (errm)
     279           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
     280             : 
     281             :             /*
     282             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     283             :              * store the description into our tuplestore.
     284             :              */
     285     3271856 :             if (record != NULL)
     286     3271856 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     287             : 
     288             :             /* check limits */
     289     3271856 :             if (upto_lsn != InvalidXLogRecPtr &&
     290           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     291           0 :                 break;
     292     3271856 :             if (upto_nchanges != 0 &&
     293           0 :                 upto_nchanges <= p->returned_rows)
     294           0 :                 break;
     295     3271856 :             CHECK_FOR_INTERRUPTS();
     296             :         }
     297             : 
     298             :         tuplestore_donestoring(tupstore);
     299             : 
     300             :         /*
     301             :          * Logical decoding could have clobbered CurrentResourceOwner during
     302             :          * transaction management, so restore the executor's value.  (This is
     303             :          * a kluge, but it's not worth cleaning up right now.)
     304             :          */
     305         326 :         CurrentResourceOwner = old_resowner;
     306             : 
     307             :         /*
     308             :          * Next time, start where we left off. (Hunting things, the family
     309             :          * business..)
     310             :          */
     311         326 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     312             :         {
     313         302 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     314             : 
     315             :             /*
     316             :              * If only the confirmed_flush_lsn has changed the slot won't get
     317             :              * marked as dirty by the above. Callers on the walsender
     318             :              * interface are expected to keep track of their own progress and
     319             :              * don't need it written out. But SQL-interface users cannot
     320             :              * specify their own start positions and it's harder for them to
     321             :              * keep track of their progress, so we should make more of an
     322             :              * effort to save it for them.
     323             :              *
     324             :              * Dirty the slot so it's written out at the next checkpoint.
     325             :              * We'll still lose its position on crash, as documented, but it's
     326             :              * better than always losing the position even on clean restart.
     327             :              */
     328         302 :             ReplicationSlotMarkDirty();
     329             :         }
     330             : 
     331             :         /* free context, call shutdown callback */
     332         326 :         FreeDecodingContext(ctx);
     333             : 
     334         326 :         ReplicationSlotRelease();
     335         326 :         InvalidateSystemCaches();
     336             :     }
     337          12 :     PG_CATCH();
     338             :     {
     339             :         /* clear all timetravel entries */
     340          12 :         InvalidateSystemCaches();
     341             : 
     342          12 :         PG_RE_THROW();
     343             :     }
     344         326 :     PG_END_TRY();
     345             : 
     346         326 :     return (Datum) 0;
     347             : }
     348             : 
     349             : /*
     350             :  * SQL function returning the changestream as text, consuming the data.
     351             :  */
     352             : Datum
     353         306 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     354             : {
     355         306 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     356             : }
     357             : 
     358             : /*
     359             :  * SQL function returning the changestream as text, only peeking ahead.
     360             :  */
     361             : Datum
     362          22 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     363             : {
     364          22 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     365             : }
     366             : 
     367             : /*
     368             :  * SQL function returning the changestream in binary, consuming the data.
     369             :  */
     370             : Datum
     371           8 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     372             : {
     373           8 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     374             : }
     375             : 
     376             : /*
     377             :  * SQL function returning the changestream in binary, only peeking ahead.
     378             :  */
     379             : Datum
     380           6 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     381             : {
     382           6 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     383             : }
     384             : 
     385             : 
     386             : /*
     387             :  * SQL function for writing logical decoding message into WAL.
     388             :  */
     389             : Datum
     390          44 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     391             : {
     392          44 :     bool        transactional = PG_GETARG_BOOL(0);
     393          44 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     394          44 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     395             :     XLogRecPtr  lsn;
     396             : 
     397          44 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     398             :                             transactional);
     399          44 :     PG_RETURN_LSN(lsn);
     400             : }
     401             : 
     402             : Datum
     403          44 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     404             : {
     405             :     /* bytea and text are compatible */
     406          44 :     return pg_logical_emit_message_bytea(fcinfo);
     407             : }

Generated by: LCOV version 1.14