LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 102 122 83.6 %
Date: 2020-05-25 06:06:29 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xact.h"
      21             : #include "access/xlog_internal.h"
      22             : #include "access/xlogutils.h"
      23             : #include "catalog/pg_type.h"
      24             : #include "fmgr.h"
      25             : #include "funcapi.h"
      26             : #include "mb/pg_wchar.h"
      27             : #include "miscadmin.h"
      28             : #include "nodes/makefuncs.h"
      29             : #include "replication/decode.h"
      30             : #include "replication/logical.h"
      31             : #include "replication/message.h"
      32             : #include "storage/fd.h"
      33             : #include "utils/array.h"
      34             : #include "utils/builtins.h"
      35             : #include "utils/inval.h"
      36             : #include "utils/lsyscache.h"
      37             : #include "utils/memutils.h"
      38             : #include "utils/pg_lsn.h"
      39             : #include "utils/regproc.h"
      40             : #include "utils/resowner.h"
      41             : 
      42             : /* private date for writing out data */
      43             : typedef struct DecodingOutputState
      44             : {
      45             :     Tuplestorestate *tupstore;
      46             :     TupleDesc   tupdesc;
      47             :     bool        binary_output;
      48             :     int64       returned_rows;
      49             : } DecodingOutputState;
      50             : 
      51             : /*
      52             :  * Prepare for an output plugin write.
      53             :  */
      54             : static void
      55      291476 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      56             :                           bool last_write)
      57             : {
      58      291476 :     resetStringInfo(ctx->out);
      59      291476 : }
      60             : 
      61             : /*
      62             :  * Perform output plugin write into tuplestore.
      63             :  */
      64             : static void
      65      291476 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      66             :                    bool last_write)
      67             : {
      68             :     Datum       values[3];
      69             :     bool        nulls[3];
      70             :     DecodingOutputState *p;
      71             : 
      72             :     /* SQL Datums can only be of a limited length... */
      73      291476 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      74           0 :         elog(ERROR, "too much output for sql interface");
      75             : 
      76      291476 :     p = (DecodingOutputState *) ctx->output_writer_private;
      77             : 
      78      291476 :     memset(nulls, 0, sizeof(nulls));
      79      291476 :     values[0] = LSNGetDatum(lsn);
      80      291476 :     values[1] = TransactionIdGetDatum(xid);
      81             : 
      82             :     /*
      83             :      * Assert ctx->out is in database encoding when we're writing textual
      84             :      * output.
      85             :      */
      86      291476 :     if (!p->binary_output)
      87             :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      88             :                                ctx->out->data, ctx->out->len,
      89             :                                false));
      90             : 
      91             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      92      291476 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      93             : 
      94      291476 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      95      291476 :     p->returned_rows++;
      96      291476 : }
      97             : 
      98             : static void
      99         258 : check_permissions(void)
     100             : {
     101         258 :     if (!superuser() && !has_rolreplication(GetUserId()))
     102           2 :         ereport(ERROR,
     103             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     104             :                  errmsg("must be superuser or replication role to use replication slots")));
     105         256 : }
     106             : 
     107             : /*
     108             :  * Helper function for the various SQL callable logical decoding functions.
     109             :  */
     110             : static Datum
     111         258 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     112             : {
     113             :     Name        name;
     114             :     XLogRecPtr  upto_lsn;
     115             :     int32       upto_nchanges;
     116         258 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     117             :     MemoryContext per_query_ctx;
     118             :     MemoryContext oldcontext;
     119             :     XLogRecPtr  end_of_wal;
     120             :     LogicalDecodingContext *ctx;
     121         258 :     ResourceOwner old_resowner = CurrentResourceOwner;
     122             :     ArrayType  *arr;
     123             :     Size        ndim;
     124         258 :     List       *options = NIL;
     125             :     DecodingOutputState *p;
     126             : 
     127         258 :     check_permissions();
     128             : 
     129         256 :     CheckLogicalDecodingRequirements();
     130             : 
     131         256 :     if (PG_ARGISNULL(0))
     132           0 :         ereport(ERROR,
     133             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     134             :                  errmsg("slot name must not be null")));
     135         256 :     name = PG_GETARG_NAME(0);
     136             : 
     137         256 :     if (PG_ARGISNULL(1))
     138         256 :         upto_lsn = InvalidXLogRecPtr;
     139             :     else
     140           0 :         upto_lsn = PG_GETARG_LSN(1);
     141             : 
     142         256 :     if (PG_ARGISNULL(2))
     143         256 :         upto_nchanges = InvalidXLogRecPtr;
     144             :     else
     145           0 :         upto_nchanges = PG_GETARG_INT32(2);
     146             : 
     147         256 :     if (PG_ARGISNULL(3))
     148           0 :         ereport(ERROR,
     149             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     150             :                  errmsg("options array must not be null")));
     151         256 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     152             : 
     153             :     /* check to see if caller supports us returning a tuplestore */
     154         256 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     155           0 :         ereport(ERROR,
     156             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     157             :                  errmsg("set-valued function called in context that cannot accept a set")));
     158         256 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     159           0 :         ereport(ERROR,
     160             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     161             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     162             : 
     163             :     /* state to write output to */
     164         256 :     p = palloc0(sizeof(DecodingOutputState));
     165             : 
     166         256 :     p->binary_output = binary;
     167             : 
     168             :     /* Build a tuple descriptor for our result type */
     169         256 :     if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
     170           0 :         elog(ERROR, "return type must be a row type");
     171             : 
     172         256 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     173         256 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     174             : 
     175             :     /* Deconstruct options array */
     176         256 :     ndim = ARR_NDIM(arr);
     177         256 :     if (ndim > 1)
     178             :     {
     179           0 :         ereport(ERROR,
     180             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     181             :                  errmsg("array must be one-dimensional")));
     182             :     }
     183         256 :     else if (array_contains_nulls(arr))
     184             :     {
     185           0 :         ereport(ERROR,
     186             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     187             :                  errmsg("array must not contain nulls")));
     188             :     }
     189         256 :     else if (ndim == 1)
     190             :     {
     191             :         int         nelems;
     192             :         Datum      *datum_opts;
     193             :         int         i;
     194             : 
     195             :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     196             : 
     197         218 :         deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
     198             :                           &datum_opts, NULL, &nelems);
     199             : 
     200         218 :         if (nelems % 2 != 0)
     201           0 :             ereport(ERROR,
     202             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     203             :                      errmsg("array must have even number of elements")));
     204             : 
     205         638 :         for (i = 0; i < nelems; i += 2)
     206             :         {
     207         420 :             char       *name = TextDatumGetCString(datum_opts[i]);
     208         420 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     209             : 
     210         420 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
     211             :         }
     212             :     }
     213             : 
     214         256 :     p->tupstore = tuplestore_begin_heap(true, false, work_mem);
     215         256 :     rsinfo->returnMode = SFRM_Materialize;
     216         256 :     rsinfo->setResult = p->tupstore;
     217         256 :     rsinfo->setDesc = p->tupdesc;
     218             : 
     219             :     /*
     220             :      * Compute the current end-of-wal and maintain ThisTimeLineID.
     221             :      * RecoveryInProgress() will update ThisTimeLineID on promotion.
     222             :      */
     223         256 :     if (!RecoveryInProgress())
     224         256 :         end_of_wal = GetFlushRecPtr();
     225             :     else
     226           0 :         end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
     227             : 
     228         256 :     (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
     229             : 
     230         254 :     PG_TRY();
     231             :     {
     232             :         /* restart at slot's confirmed_flush */
     233         498 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     234             :                                     options,
     235             :                                     false,
     236         254 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     237             :                                                .segment_open = wal_segment_open,
     238             :                                                .segment_close = wal_segment_close),
     239             :                                     LogicalOutputPrepareWrite,
     240             :                                     LogicalOutputWrite, NULL);
     241             : 
     242             :         /*
     243             :          * After the sanity checks in CreateDecodingContext, make sure the
     244             :          * restart_lsn is valid.  Avoid "cannot get changes" wording in this
     245             :          * errmsg because that'd be confusingly ambiguous about no changes
     246             :          * being available.
     247             :          */
     248         244 :         if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
     249           0 :             ereport(ERROR,
     250             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     251             :                      errmsg("can no longer get changes from replication slot \"%s\"",
     252             :                             NameStr(*name)),
     253             :                      errdetail("This slot has never previously reserved WAL, or has been invalidated.")));
     254             : 
     255         244 :         MemoryContextSwitchTo(oldcontext);
     256             : 
     257             :         /*
     258             :          * Check whether the output plugin writes textual output if that's
     259             :          * what we need.
     260             :          */
     261         244 :         if (!binary &&
     262         240 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     263           2 :             ereport(ERROR,
     264             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     265             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     266             :                             NameStr(MyReplicationSlot->data.plugin),
     267             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     268             : 
     269         242 :         ctx->output_writer_private = p;
     270             : 
     271             :         /*
     272             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     273             :          * xacts that committed after the slot's confirmed_flush can be
     274             :          * accumulated into reorder buffers.
     275             :          */
     276         242 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     277             : 
     278             :         /* invalidate non-timetravel entries */
     279         242 :         InvalidateSystemCaches();
     280             : 
     281             :         /* Decode until we run out of records */
     282     2754030 :         while (ctx->reader->EndRecPtr < end_of_wal)
     283             :         {
     284             :             XLogRecord *record;
     285     2753788 :             char       *errm = NULL;
     286             : 
     287     2753788 :             record = XLogReadRecord(ctx->reader, &errm);
     288     2753788 :             if (errm)
     289           0 :                 elog(ERROR, "%s", errm);
     290             : 
     291             :             /*
     292             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     293             :              * store the description into our tuplestore.
     294             :              */
     295     2753788 :             if (record != NULL)
     296     2753788 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     297             : 
     298             :             /* check limits */
     299     2753788 :             if (upto_lsn != InvalidXLogRecPtr &&
     300           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     301           0 :                 break;
     302     2753788 :             if (upto_nchanges != 0 &&
     303           0 :                 upto_nchanges <= p->returned_rows)
     304           0 :                 break;
     305     2753788 :             CHECK_FOR_INTERRUPTS();
     306             :         }
     307             : 
     308             :         tuplestore_donestoring(tupstore);
     309             : 
     310             :         /*
     311             :          * Logical decoding could have clobbered CurrentResourceOwner during
     312             :          * transaction management, so restore the executor's value.  (This is
     313             :          * a kluge, but it's not worth cleaning up right now.)
     314             :          */
     315         242 :         CurrentResourceOwner = old_resowner;
     316             : 
     317             :         /*
     318             :          * Next time, start where we left off. (Hunting things, the family
     319             :          * business..)
     320             :          */
     321         242 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     322             :         {
     323         226 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     324             : 
     325             :             /*
     326             :              * If only the confirmed_flush_lsn has changed the slot won't get
     327             :              * marked as dirty by the above. Callers on the walsender
     328             :              * interface are expected to keep track of their own progress and
     329             :              * don't need it written out. But SQL-interface users cannot
     330             :              * specify their own start positions and it's harder for them to
     331             :              * keep track of their progress, so we should make more of an
     332             :              * effort to save it for them.
     333             :              *
     334             :              * Dirty the slot so it's written out at the next checkpoint.
     335             :              * We'll still lose its position on crash, as documented, but it's
     336             :              * better than always losing the position even on clean restart.
     337             :              */
     338         226 :             ReplicationSlotMarkDirty();
     339             :         }
     340             : 
     341             :         /* free context, call shutdown callback */
     342         242 :         FreeDecodingContext(ctx);
     343             : 
     344         242 :         ReplicationSlotRelease();
     345         242 :         InvalidateSystemCaches();
     346             :     }
     347          12 :     PG_CATCH();
     348             :     {
     349             :         /* clear all timetravel entries */
     350          12 :         InvalidateSystemCaches();
     351             : 
     352          12 :         PG_RE_THROW();
     353             :     }
     354         242 :     PG_END_TRY();
     355             : 
     356         242 :     return (Datum) 0;
     357             : }
     358             : 
     359             : /*
     360             :  * SQL function returning the changestream as text, consuming the data.
     361             :  */
     362             : Datum
     363         234 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     364             : {
     365         234 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     366             : }
     367             : 
     368             : /*
     369             :  * SQL function returning the changestream as text, only peeking ahead.
     370             :  */
     371             : Datum
     372          20 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     373             : {
     374          20 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     375             : }
     376             : 
     377             : /*
     378             :  * SQL function returning the changestream in binary, consuming the data.
     379             :  */
     380             : Datum
     381           4 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     382             : {
     383           4 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     384             : }
     385             : 
     386             : /*
     387             :  * SQL function returning the changestream in binary, only peeking ahead.
     388             :  */
     389             : Datum
     390           0 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     391             : {
     392           0 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     393             : }
     394             : 
     395             : 
     396             : /*
     397             :  * SQL function for writing logical decoding message into WAL.
     398             :  */
     399             : Datum
     400          24 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     401             : {
     402          24 :     bool        transactional = PG_GETARG_BOOL(0);
     403          24 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     404          24 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     405             :     XLogRecPtr  lsn;
     406             : 
     407          24 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     408             :                             transactional);
     409          24 :     PG_RETURN_LSN(lsn);
     410             : }
     411             : 
     412             : Datum
     413          24 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     414             : {
     415             :     /* bytea and text are compatible */
     416          24 :     return pg_logical_emit_message_bytea(fcinfo);
     417             : }

Generated by: LCOV version 1.13