Line data Source code
1 : /* ------------------------------------------------------------------------- 2 : * 3 : * pgstat_subscription.c 4 : * Implementation of subscription statistics. 5 : * 6 : * This file contains the implementation of subscription statistics. It is kept 7 : * separate from pgstat.c to enforce the line between the statistics access / 8 : * storage implementation and the details about individual types of 9 : * statistics. 10 : * 11 : * Copyright (c) 2001-2025, PostgreSQL Global Development Group 12 : * 13 : * IDENTIFICATION 14 : * src/backend/utils/activity/pgstat_subscription.c 15 : * ------------------------------------------------------------------------- 16 : */ 17 : 18 : #include "postgres.h" 19 : 20 : #include "replication/worker_internal.h" 21 : #include "utils/pgstat_internal.h" 22 : 23 : 24 : /* 25 : * Report a subscription error. 26 : */ 27 : void 28 196 : pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) 29 : { 30 : PgStat_EntryRef *entry_ref; 31 : PgStat_BackendSubEntry *pending; 32 : 33 196 : entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, 34 : InvalidOid, subid, NULL); 35 196 : pending = entry_ref->pending; 36 : 37 196 : switch (wtype) 38 : { 39 162 : case WORKERTYPE_APPLY: 40 162 : pending->apply_error_count++; 41 162 : break; 42 : 43 8 : case WORKERTYPE_SEQUENCESYNC: 44 8 : pending->sync_seq_error_count++; 45 8 : break; 46 : 47 26 : case WORKERTYPE_TABLESYNC: 48 26 : pending->sync_table_error_count++; 49 26 : break; 50 : 51 0 : default: 52 : /* Should never happen. */ 53 : Assert(0); 54 0 : break; 55 : } 56 196 : } 57 : 58 : /* 59 : * Report a subscription conflict. 60 : */ 61 : void 62 114 : pgstat_report_subscription_conflict(Oid subid, ConflictType type) 63 : { 64 : PgStat_EntryRef *entry_ref; 65 : PgStat_BackendSubEntry *pending; 66 : 67 114 : entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION, 68 : InvalidOid, subid, NULL); 69 114 : pending = entry_ref->pending; 70 114 : pending->conflict_count[type]++; 71 114 : } 72 : 73 : /* 74 : * Report creating the subscription. 75 : */ 76 : void 77 332 : pgstat_create_subscription(Oid subid) 78 : { 79 : /* Ensures that stats are dropped if transaction rolls back */ 80 332 : pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION, 81 : InvalidOid, subid); 82 : 83 : /* Create and initialize the subscription stats entry */ 84 332 : pgstat_get_entry_ref(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid, 85 : true, NULL); 86 332 : pgstat_reset_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid, 0); 87 332 : } 88 : 89 : /* 90 : * Report dropping the subscription. 91 : * 92 : * Ensures that stats are dropped if transaction commits. 93 : */ 94 : void 95 234 : pgstat_drop_subscription(Oid subid) 96 : { 97 234 : pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION, 98 : InvalidOid, subid); 99 234 : } 100 : 101 : /* 102 : * Support function for the SQL-callable pgstat* functions. Returns 103 : * the collected statistics for one subscription or NULL. 104 : */ 105 : PgStat_StatSubEntry * 106 72 : pgstat_fetch_stat_subscription(Oid subid) 107 : { 108 72 : return (PgStat_StatSubEntry *) 109 72 : pgstat_fetch_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid); 110 : } 111 : 112 : /* 113 : * Flush out pending stats for the entry 114 : * 115 : * If nowait is true and the lock could not be immediately acquired, returns 116 : * false without flushing the entry. Otherwise returns true. 117 : */ 118 : bool 119 226 : pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) 120 : { 121 : PgStat_BackendSubEntry *localent; 122 : PgStatShared_Subscription *shsubent; 123 : 124 226 : localent = (PgStat_BackendSubEntry *) entry_ref->pending; 125 226 : shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats; 126 : 127 : /* localent always has non-zero content */ 128 : 129 226 : if (!pgstat_lock_entry(entry_ref, nowait)) 130 0 : return false; 131 : 132 : #define SUB_ACC(fld) shsubent->stats.fld += localent->fld 133 226 : SUB_ACC(apply_error_count); 134 226 : SUB_ACC(sync_seq_error_count); 135 226 : SUB_ACC(sync_table_error_count); 136 2034 : for (int i = 0; i < CONFLICT_NUM_TYPES; i++) 137 1808 : SUB_ACC(conflict_count[i]); 138 : #undef SUB_ACC 139 : 140 226 : pgstat_unlock_entry(entry_ref); 141 226 : return true; 142 : } 143 : 144 : void 145 356 : pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) 146 : { 147 356 : ((PgStatShared_Subscription *) header)->stats.stat_reset_timestamp = ts; 148 356 : }