LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 104 123 84.6 %
Date: 2019-11-21 14:06:36 Functions: 10 11 90.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xact.h"
      21             : #include "access/xlog_internal.h"
      22             : #include "access/xlogutils.h"
      23             : #include "catalog/pg_type.h"
      24             : #include "fmgr.h"
      25             : #include "funcapi.h"
      26             : #include "mb/pg_wchar.h"
      27             : #include "miscadmin.h"
      28             : #include "nodes/makefuncs.h"
      29             : #include "replication/decode.h"
      30             : #include "replication/logical.h"
      31             : #include "replication/logicalfuncs.h"
      32             : #include "replication/message.h"
      33             : #include "storage/fd.h"
      34             : #include "utils/array.h"
      35             : #include "utils/builtins.h"
      36             : #include "utils/inval.h"
      37             : #include "utils/lsyscache.h"
      38             : #include "utils/memutils.h"
      39             : #include "utils/pg_lsn.h"
      40             : #include "utils/regproc.h"
      41             : #include "utils/resowner.h"
      42             : 
      43             : /* private date for writing out data */
      44             : typedef struct DecodingOutputState
      45             : {
      46             :     Tuplestorestate *tupstore;
      47             :     TupleDesc   tupdesc;
      48             :     bool        binary_output;
      49             :     int64       returned_rows;
      50             : } DecodingOutputState;
      51             : 
      52             : /*
      53             :  * Prepare for an output plugin write.
      54             :  */
      55             : static void
      56      291208 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      57             :                           bool last_write)
      58             : {
      59      291208 :     resetStringInfo(ctx->out);
      60      291208 : }
      61             : 
      62             : /*
      63             :  * Perform output plugin write into tuplestore.
      64             :  */
      65             : static void
      66      291208 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      67             :                    bool last_write)
      68             : {
      69             :     Datum       values[3];
      70             :     bool        nulls[3];
      71             :     DecodingOutputState *p;
      72             : 
      73             :     /* SQL Datums can only be of a limited length... */
      74      291208 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      75           0 :         elog(ERROR, "too much output for sql interface");
      76             : 
      77      291208 :     p = (DecodingOutputState *) ctx->output_writer_private;
      78             : 
      79      291208 :     memset(nulls, 0, sizeof(nulls));
      80      291208 :     values[0] = LSNGetDatum(lsn);
      81      291208 :     values[1] = TransactionIdGetDatum(xid);
      82             : 
      83             :     /*
      84             :      * Assert ctx->out is in database encoding when we're writing textual
      85             :      * output.
      86             :      */
      87      291208 :     if (!p->binary_output)
      88             :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      89             :                                ctx->out->data, ctx->out->len,
      90             :                                false));
      91             : 
      92             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      93      291208 :     values[2] = PointerGetDatum(
      94             :                                 cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      95             : 
      96      291208 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      97      291208 :     p->returned_rows++;
      98      291208 : }
      99             : 
     100             : static void
     101         252 : check_permissions(void)
     102             : {
     103         252 :     if (!superuser() && !has_rolreplication(GetUserId()))
     104           2 :         ereport(ERROR,
     105             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     106             :                  (errmsg("must be superuser or replication role to use replication slots"))));
     107         250 : }
     108             : 
     109             : int
     110       27328 : logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
     111             :                              int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
     112             : {
     113       27328 :     return read_local_xlog_page(state, targetPagePtr, reqLen,
     114             :                                 targetRecPtr, cur_page);
     115             : }
     116             : 
     117             : /*
     118             :  * Helper function for the various SQL callable logical decoding functions.
     119             :  */
     120             : static Datum
     121         252 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     122             : {
     123             :     Name        name;
     124             :     XLogRecPtr  upto_lsn;
     125             :     int32       upto_nchanges;
     126         252 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     127             :     MemoryContext per_query_ctx;
     128             :     MemoryContext oldcontext;
     129             :     XLogRecPtr  end_of_wal;
     130             :     XLogRecPtr  startptr;
     131             :     LogicalDecodingContext *ctx;
     132         252 :     ResourceOwner old_resowner = CurrentResourceOwner;
     133             :     ArrayType  *arr;
     134             :     Size        ndim;
     135         252 :     List       *options = NIL;
     136             :     DecodingOutputState *p;
     137             : 
     138         252 :     check_permissions();
     139             : 
     140         250 :     CheckLogicalDecodingRequirements();
     141             : 
     142         250 :     if (PG_ARGISNULL(0))
     143           0 :         ereport(ERROR,
     144             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     145             :                  errmsg("slot name must not be null")));
     146         250 :     name = PG_GETARG_NAME(0);
     147             : 
     148         250 :     if (PG_ARGISNULL(1))
     149         250 :         upto_lsn = InvalidXLogRecPtr;
     150             :     else
     151           0 :         upto_lsn = PG_GETARG_LSN(1);
     152             : 
     153         250 :     if (PG_ARGISNULL(2))
     154         250 :         upto_nchanges = InvalidXLogRecPtr;
     155             :     else
     156           0 :         upto_nchanges = PG_GETARG_INT32(2);
     157             : 
     158         250 :     if (PG_ARGISNULL(3))
     159           0 :         ereport(ERROR,
     160             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     161             :                  errmsg("options array must not be null")));
     162         250 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     163             : 
     164             :     /* check to see if caller supports us returning a tuplestore */
     165         250 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     166           0 :         ereport(ERROR,
     167             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     168             :                  errmsg("set-valued function called in context that cannot accept a set")));
     169         250 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     170           0 :         ereport(ERROR,
     171             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     172             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     173             : 
     174             :     /* state to write output to */
     175         250 :     p = palloc0(sizeof(DecodingOutputState));
     176             : 
     177         250 :     p->binary_output = binary;
     178             : 
     179             :     /* Build a tuple descriptor for our result type */
     180         250 :     if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
     181           0 :         elog(ERROR, "return type must be a row type");
     182             : 
     183         250 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     184         250 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     185             : 
     186             :     /* Deconstruct options array */
     187         250 :     ndim = ARR_NDIM(arr);
     188         250 :     if (ndim > 1)
     189             :     {
     190           0 :         ereport(ERROR,
     191             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     192             :                  errmsg("array must be one-dimensional")));
     193             :     }
     194         250 :     else if (array_contains_nulls(arr))
     195             :     {
     196           0 :         ereport(ERROR,
     197             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     198             :                  errmsg("array must not contain nulls")));
     199             :     }
     200         250 :     else if (ndim == 1)
     201             :     {
     202             :         int         nelems;
     203             :         Datum      *datum_opts;
     204             :         int         i;
     205             : 
     206             :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     207             : 
     208         212 :         deconstruct_array(arr, TEXTOID, -1, false, 'i',
     209             :                           &datum_opts, NULL, &nelems);
     210             : 
     211         212 :         if (nelems % 2 != 0)
     212           0 :             ereport(ERROR,
     213             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     214             :                      errmsg("array must have even number of elements")));
     215             : 
     216         620 :         for (i = 0; i < nelems; i += 2)
     217             :         {
     218         408 :             char       *name = TextDatumGetCString(datum_opts[i]);
     219         408 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     220             : 
     221         408 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
     222             :         }
     223             :     }
     224             : 
     225         250 :     p->tupstore = tuplestore_begin_heap(true, false, work_mem);
     226         250 :     rsinfo->returnMode = SFRM_Materialize;
     227         250 :     rsinfo->setResult = p->tupstore;
     228         250 :     rsinfo->setDesc = p->tupdesc;
     229             : 
     230             :     /*
     231             :      * Compute the current end-of-wal and maintain ThisTimeLineID.
     232             :      * RecoveryInProgress() will update ThisTimeLineID on promotion.
     233             :      */
     234         250 :     if (!RecoveryInProgress())
     235         250 :         end_of_wal = GetFlushRecPtr();
     236             :     else
     237           0 :         end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
     238             : 
     239         250 :     ReplicationSlotAcquire(NameStr(*name), true);
     240             : 
     241         248 :     PG_TRY();
     242             :     {
     243             :         /* restart at slot's confirmed_flush */
     244         248 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     245             :                                     options,
     246             :                                     false,
     247             :                                     logical_read_local_xlog_page,
     248             :                                     LogicalOutputPrepareWrite,
     249             :                                     LogicalOutputWrite, NULL);
     250             : 
     251         238 :         MemoryContextSwitchTo(oldcontext);
     252             : 
     253             :         /*
     254             :          * Check whether the output plugin writes textual output if that's
     255             :          * what we need.
     256             :          */
     257         472 :         if (!binary &&
     258         234 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     259           2 :             ereport(ERROR,
     260             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     261             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     262             :                             NameStr(MyReplicationSlot->data.plugin),
     263             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     264             : 
     265         236 :         ctx->output_writer_private = p;
     266             : 
     267             :         /*
     268             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     269             :          * xacts that committed after the slot's confirmed_flush can be
     270             :          * accumulated into reorder buffers.
     271             :          */
     272         236 :         startptr = MyReplicationSlot->data.restart_lsn;
     273             : 
     274             :         /* invalidate non-timetravel entries */
     275         236 :         InvalidateSystemCaches();
     276             : 
     277             :         /* Decode until we run out of records */
     278     5139620 :         while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
     279     5139148 :                (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
     280             :         {
     281             :             XLogRecord *record;
     282     2569574 :             char       *errm = NULL;
     283             : 
     284     2569574 :             record = XLogReadRecord(ctx->reader, startptr, &errm);
     285     2569574 :             if (errm)
     286           0 :                 elog(ERROR, "%s", errm);
     287             : 
     288             :             /*
     289             :              * Now that we've set up the xlog reader state, subsequent calls
     290             :              * pass InvalidXLogRecPtr to say "continue from last record"
     291             :              */
     292     2569574 :             startptr = InvalidXLogRecPtr;
     293             : 
     294             :             /*
     295             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     296             :              * store the description into our tuplestore.
     297             :              */
     298     2569574 :             if (record != NULL)
     299     2569574 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     300             : 
     301             :             /* check limits */
     302     2569574 :             if (upto_lsn != InvalidXLogRecPtr &&
     303           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     304           0 :                 break;
     305     2569574 :             if (upto_nchanges != 0 &&
     306           0 :                 upto_nchanges <= p->returned_rows)
     307           0 :                 break;
     308     2569574 :             CHECK_FOR_INTERRUPTS();
     309             :         }
     310             : 
     311             :         tuplestore_donestoring(tupstore);
     312             : 
     313             :         /*
     314             :          * Logical decoding could have clobbered CurrentResourceOwner during
     315             :          * transaction management, so restore the executor's value.  (This is
     316             :          * a kluge, but it's not worth cleaning up right now.)
     317             :          */
     318         236 :         CurrentResourceOwner = old_resowner;
     319             : 
     320             :         /*
     321             :          * Next time, start where we left off. (Hunting things, the family
     322             :          * business..)
     323             :          */
     324         236 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     325             :         {
     326         220 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     327             : 
     328             :             /*
     329             :              * If only the confirmed_flush_lsn has changed the slot won't get
     330             :              * marked as dirty by the above. Callers on the walsender
     331             :              * interface are expected to keep track of their own progress and
     332             :              * don't need it written out. But SQL-interface users cannot
     333             :              * specify their own start positions and it's harder for them to
     334             :              * keep track of their progress, so we should make more of an
     335             :              * effort to save it for them.
     336             :              *
     337             :              * Dirty the slot so it's written out at the next checkpoint.
     338             :              * We'll still lose its position on crash, as documented, but it's
     339             :              * better than always losing the position even on clean restart.
     340             :              */
     341         220 :             ReplicationSlotMarkDirty();
     342             :         }
     343             : 
     344             :         /* free context, call shutdown callback */
     345         236 :         FreeDecodingContext(ctx);
     346             : 
     347         236 :         ReplicationSlotRelease();
     348         236 :         InvalidateSystemCaches();
     349             :     }
     350          12 :     PG_CATCH();
     351             :     {
     352             :         /* clear all timetravel entries */
     353          12 :         InvalidateSystemCaches();
     354             : 
     355          12 :         PG_RE_THROW();
     356             :     }
     357         236 :     PG_END_TRY();
     358             : 
     359         236 :     return (Datum) 0;
     360             : }
     361             : 
     362             : /*
     363             :  * SQL function returning the changestream as text, consuming the data.
     364             :  */
     365             : Datum
     366         228 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     367             : {
     368         228 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     369             : }
     370             : 
     371             : /*
     372             :  * SQL function returning the changestream as text, only peeking ahead.
     373             :  */
     374             : Datum
     375          20 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     376             : {
     377          20 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     378             : }
     379             : 
     380             : /*
     381             :  * SQL function returning the changestream in binary, consuming the data.
     382             :  */
     383             : Datum
     384           4 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     385             : {
     386           4 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     387             : }
     388             : 
     389             : /*
     390             :  * SQL function returning the changestream in binary, only peeking ahead.
     391             :  */
     392             : Datum
     393           0 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     394             : {
     395           0 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     396             : }
     397             : 
     398             : 
     399             : /*
     400             :  * SQL function for writing logical decoding message into WAL.
     401             :  */
     402             : Datum
     403          24 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     404             : {
     405          24 :     bool        transactional = PG_GETARG_BOOL(0);
     406          24 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     407          24 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     408             :     XLogRecPtr  lsn;
     409             : 
     410          24 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     411             :                             transactional);
     412          24 :     PG_RETURN_LSN(lsn);
     413             : }
     414             : 
     415             : Datum
     416          24 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     417             : {
     418             :     /* bytea and text are compatible */
     419          24 :     return pg_logical_emit_message_bytea(fcinfo);
     420             : }

Generated by: LCOV version 1.13