Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * pgstat_subscription.c
4 : * Implementation of subscription statistics.
5 : *
6 : * This file contains the implementation of subscription statistics. It is kept
7 : * separate from pgstat.c to enforce the line between the statistics access /
8 : * storage implementation and the details about individual types of
9 : * statistics.
10 : *
11 : * Copyright (c) 2001-2026, PostgreSQL Global Development Group
12 : *
13 : * IDENTIFICATION
14 : * src/backend/utils/activity/pgstat_subscription.c
15 : * -------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "replication/worker_internal.h"
21 : #include "utils/pgstat_internal.h"
22 :
23 :
24 : /*
25 : * Report a subscription error.
26 : */
27 : void
28 105 : pgstat_report_subscription_error(Oid subid)
29 : {
30 : PgStat_EntryRef *entry_ref;
31 : PgStat_BackendSubEntry *pending;
32 105 : LogicalRepWorkerType wtype = get_logical_worker_type();
33 :
34 105 : entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
35 : InvalidOid, subid, NULL);
36 105 : pending = entry_ref->pending;
37 :
38 105 : switch (wtype)
39 : {
40 88 : case WORKERTYPE_APPLY:
41 88 : pending->apply_error_count++;
42 88 : break;
43 :
44 4 : case WORKERTYPE_SEQUENCESYNC:
45 4 : pending->sync_seq_error_count++;
46 4 : break;
47 :
48 13 : case WORKERTYPE_TABLESYNC:
49 13 : pending->sync_table_error_count++;
50 13 : break;
51 :
52 0 : default:
53 : /* Should never happen. */
54 : Assert(0);
55 0 : break;
56 : }
57 105 : }
58 :
59 : /*
60 : * Report a subscription conflict.
61 : */
62 : void
63 56 : pgstat_report_subscription_conflict(Oid subid, ConflictType type)
64 : {
65 : PgStat_EntryRef *entry_ref;
66 : PgStat_BackendSubEntry *pending;
67 :
68 56 : entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
69 : InvalidOid, subid, NULL);
70 56 : pending = entry_ref->pending;
71 56 : pending->conflict_count[type]++;
72 56 : }
73 :
74 : /*
75 : * Report creating the subscription.
76 : */
77 : void
78 175 : pgstat_create_subscription(Oid subid)
79 : {
80 : /* Ensures that stats are dropped if transaction rolls back */
81 175 : pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION,
82 : InvalidOid, subid);
83 :
84 : /* Create and initialize the subscription stats entry */
85 175 : pgstat_get_entry_ref(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid,
86 : true, NULL);
87 175 : pgstat_reset_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid, 0);
88 175 : }
89 :
90 : /*
91 : * Report dropping the subscription.
92 : *
93 : * Ensures that stats are dropped if transaction commits.
94 : */
95 : void
96 128 : pgstat_drop_subscription(Oid subid)
97 : {
98 128 : pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION,
99 : InvalidOid, subid);
100 128 : }
101 :
102 : /*
103 : * Support function for the SQL-callable pgstat* functions. Returns
104 : * the collected statistics for one subscription or NULL.
105 : */
106 : PgStat_StatSubEntry *
107 35 : pgstat_fetch_stat_subscription(Oid subid)
108 : {
109 35 : return (PgStat_StatSubEntry *)
110 35 : pgstat_fetch_entry(PGSTAT_KIND_SUBSCRIPTION, InvalidOid, subid);
111 : }
112 :
113 : /*
114 : * Flush out pending stats for the entry
115 : *
116 : * If nowait is true and the lock could not be immediately acquired, returns
117 : * false without flushing the entry. Otherwise returns true.
118 : */
119 : bool
120 122 : pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
121 : {
122 : PgStat_BackendSubEntry *localent;
123 : PgStatShared_Subscription *shsubent;
124 :
125 122 : localent = (PgStat_BackendSubEntry *) entry_ref->pending;
126 122 : shsubent = (PgStatShared_Subscription *) entry_ref->shared_stats;
127 :
128 : /* localent always has non-zero content */
129 :
130 122 : if (!pgstat_lock_entry(entry_ref, nowait))
131 0 : return false;
132 :
133 : #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
134 122 : SUB_ACC(apply_error_count);
135 122 : SUB_ACC(sync_seq_error_count);
136 122 : SUB_ACC(sync_table_error_count);
137 1098 : for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
138 976 : SUB_ACC(conflict_count[i]);
139 : #undef SUB_ACC
140 :
141 122 : pgstat_unlock_entry(entry_ref);
142 122 : return true;
143 : }
144 :
145 : void
146 187 : pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
147 : {
148 187 : ((PgStatShared_Subscription *) header)->stats.stat_reset_timestamp = ts;
149 187 : }
|