Line data Source code
1 : /* ------------------------------------------------------------------------- 2 : * 3 : * pgstat_subscription.c 4 : * Implementation of subscription statistics. 5 : * 6 : * This file contains the implementation of subscription statistics. It is kept 7 : * separate from pgstat.c to enforce the line between the statistics access / 8 : * storage implementation and the details about individual types of 9 : * statistics. 10 : * 11 : * Copyright (c) 2001-2025, PostgreSQL Global Development Group 12 : * 13 : * IDENTIFICATION 14 : * src/backend/utils/activity/pgstat_subscription.c 15 : * ------------------------------------------------------------------------- 16 : */ 17 : 18 : #include "postgres.h" 19 : 20 : #include "replication/worker_internal.h" 21 : #include "utils/pgstat_internal.h" 22 : 23 : 24 : /* 25 : * Report a subscription error. 26 : */ 27 : void 28 174 : pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) 29 : { 30 : PgStat_EntryRef *entry_ref; 31 : PgStat_BackendSubEntry *pending; 32 : 33 174 : entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, 34 : InvalidOid, subid, NULL); 35 174 : pending = entry_ref->pending; 36 : 37 174 : switch (wtype) 38 : { 39 142 : case WORKERTYPE_APPLY: 40 142 : pending->apply_error_count++; 41 142 : break; 42 : 43 8 : case WORKERTYPE_SEQUENCESYNC: 44 8 : pending->seq_sync_error_count++; 45 8 : break; 46 : 47 24 : case WORKERTYPE_TABLESYNC: 48 24 : pending->sync_error_count++; 49 24 : break; 50 : 51 0 : default: 52 : /* Should never happen. */ 53 : Assert(0); 54 0 : break; 55 : } 56 174 : } 57 : 58 : /* 59 : * Report a subscription conflict. 60 : */ 61 : void 62 106 : pgstat_report_subscription_conflict(Oid subid, ConflictType type) 63 : { 64 : PgStat_EntryRef *entry_ref; 65 : PgStat_BackendSubEntry *pending; 66 : 67 106 : entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, 68 : InvalidOid, subid, NULL); 69 106 : pending = entry_ref->pending; 70 106 : pending->conflict_count[type]++; 71 106 : } 72 : 73 : /* 74 : * Report creating the subscription. 75 : */ 76 : void 77 330 : pgstat_create_subscription(Oid subid) 78 : { 79 : /* Ensures that stats are dropped if transaction rolls back */ 80 330 : pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION, 81 : InvalidOid, subid); 82 : 83 : /* Create and initialize the subscription stats entry */ 84 330 : pgstat_get_entry_ref(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid, 85 : true, NULL); 86 330 : pgstat_reset_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid, 0); 87 330 : } 88 : 89 : /* 90 : * Report dropping the subscription. 91 : * 92 : * Ensures that stats are dropped if transaction commits. 93 : */ 94 : void 95 230 : pgstat_drop_subscription(Oid subid) 96 : { 97 230 : pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION, 98 : InvalidOid, subid); 99 230 : } 100 : 101 : /* 102 : * Support function for the SQL-callable pgstat* functions. Returns 103 : * the collected statistics for one subscription or NULL. 104 : */ 105 : PgStat_StatSubEntry * 106 78 : pgstat_fetch_stat_subscription(Oid subid) 107 : { 108 78 : return (PgStat_StatSubEntry *) 109 78 : pgstat_fetch_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid); 110 : } 111 : 112 : /* 113 : * Flush out pending stats for the entry 114 : * 115 : * If nowait is true and the lock could not be immediately acquired, returns 116 : * false without flushing the entry. Otherwise returns true. 117 : */ 118 : bool 119 204 : pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) 120 : { 121 : PgStat_BackendSubEntry *localent; 122 : PgStatShared_Subscription *shsubent; 123 : 124 204 : localent = (PgStat_BackendSubEntry *) entry_ref->pending; 125 204 : shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats; 126 : 127 : /* localent always has non-zero content */ 128 : 129 204 : if (!pgstat_lock_entry(entry_ref, nowait)) 130 0 : return false; 131 : 132 : #define SUB_ACC(fld) shsubent->stats.fld += localent->fld 133 204 : SUB_ACC(apply_error_count); 134 204 : SUB_ACC(seq_sync_error_count); 135 204 : SUB_ACC(sync_error_count); 136 1836 : for (int i = 0; i < CONFLICT_NUM_TYPES; i++) 137 1632 : SUB_ACC(conflict_count[i]); 138 : #undef SUB_ACC 139 : 140 204 : pgstat_unlock_entry(entry_ref); 141 204 : return true; 142 : } 143 : 144 : void 145 354 : pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) 146 : { 147 354 : ((PgStatShared_Subscription *) header)->stats.stat_reset_timestamp = ts; 148 354 : }