Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * pgstat_replslot.c
4 : * Implementation of replication slot statistics.
5 : *
6 : * This file contains the implementation of replication slot statistics. It is kept
7 : * separate from pgstat.c to enforce the line between the statistics access /
8 : * storage implementation and the details about individual types of
9 : * statistics.
10 : *
11 : * Replication slot stats work a bit different than other variable-numbered
12 : * stats. Slots do not have oids (so they can be created on physical
13 : * replicas). Use the slot index as object id while running. However, the slot
14 : * index can change when restarting. That is addressed by using the name when
15 : * (de-)serializing. After a restart it is possible for slots to have been
16 : * dropped while shut down, which is addressed by not restoring stats for
17 : * slots that cannot be found by name when starting up.
18 : *
19 : * Copyright (c) 2001-2025, PostgreSQL Global Development Group
20 : *
21 : * IDENTIFICATION
22 : * src/backend/utils/activity/pgstat_replslot.c
23 : * -------------------------------------------------------------------------
24 : */
25 :
26 : #include "postgres.h"
27 :
28 : #include "replication/slot.h"
29 : #include "utils/pgstat_internal.h"
30 :
31 :
32 : static int get_replslot_index(const char *name, bool need_lock);
33 :
34 :
35 : /*
36 : * Reset counters for a single replication slot.
37 : *
38 : * Permission checking for this function is managed through the normal
39 : * GRANT system.
40 : */
41 : void
42 8 : pgstat_reset_replslot(const char *name)
43 : {
44 : ReplicationSlot *slot;
45 :
46 : Assert(name != NULL);
47 :
48 8 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
49 :
50 : /* Check if the slot exits with the given name. */
51 8 : slot = SearchNamedReplicationSlot(name, false);
52 :
53 8 : if (!slot)
54 2 : ereport(ERROR,
55 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
56 : errmsg("replication slot \"%s\" does not exist",
57 : name)));
58 :
59 : /*
60 : * Reset stats if it is a logical slot. Nothing to do for physical slots
61 : * as we collect stats only for logical slots.
62 : */
63 6 : if (SlotIsLogical(slot))
64 6 : pgstat_reset(PGSTAT_KIND_REPLSLOT, InvalidOid,
65 6 : ReplicationSlotIndex(slot));
66 :
67 6 : LWLockRelease(ReplicationSlotControlLock);
68 6 : }
69 :
70 : /*
71 : * Report replication slot statistics.
72 : *
73 : * We can rely on the stats for the slot to exist and to belong to this
74 : * slot. We can only get here if pgstat_create_replslot() or
75 : * pgstat_acquire_replslot() have already been called.
76 : */
77 : void
78 12294 : pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
79 : {
80 : PgStat_EntryRef *entry_ref;
81 : PgStatShared_ReplSlot *shstatent;
82 : PgStat_StatReplSlotEntry *statent;
83 :
84 12294 : entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
85 12294 : ReplicationSlotIndex(slot), false);
86 12294 : shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
87 12294 : statent = &shstatent->stats;
88 :
89 : /* Update the replication slot statistics */
90 : #define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld
91 12294 : REPLSLOT_ACC(spill_txns);
92 12294 : REPLSLOT_ACC(spill_count);
93 12294 : REPLSLOT_ACC(spill_bytes);
94 12294 : REPLSLOT_ACC(stream_txns);
95 12294 : REPLSLOT_ACC(stream_count);
96 12294 : REPLSLOT_ACC(stream_bytes);
97 12294 : REPLSLOT_ACC(mem_exceeded_count);
98 12294 : REPLSLOT_ACC(total_txns);
99 12294 : REPLSLOT_ACC(total_bytes);
100 : #undef REPLSLOT_ACC
101 :
102 12294 : pgstat_unlock_entry(entry_ref);
103 12294 : }
104 :
105 : /*
106 : * Report replication slot sync skip statistics.
107 : *
108 : * Similar to pgstat_report_replslot(), we can rely on the stats for the
109 : * slot to exist and to belong to this slot.
110 : */
111 : void
112 2 : pgstat_report_replslotsync(ReplicationSlot *slot)
113 : {
114 : PgStat_EntryRef *entry_ref;
115 : PgStatShared_ReplSlot *shstatent;
116 : PgStat_StatReplSlotEntry *statent;
117 :
118 : /* Slot sync stats are valid only for synced logical slots on standby. */
119 : Assert(slot->data.synced);
120 : Assert(RecoveryInProgress());
121 :
122 2 : entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
123 2 : ReplicationSlotIndex(slot), false);
124 : Assert(entry_ref != NULL);
125 :
126 2 : shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
127 2 : statent = &shstatent->stats;
128 :
129 2 : statent->slotsync_skip_count += 1;
130 2 : statent->slotsync_last_skip = GetCurrentTimestamp();
131 :
132 2 : pgstat_unlock_entry(entry_ref);
133 2 : }
134 :
135 : /*
136 : * Report replication slot creation.
137 : *
138 : * NB: This gets called with ReplicationSlotAllocationLock already held, be
139 : * careful about calling back into slot.c.
140 : */
141 : void
142 940 : pgstat_create_replslot(ReplicationSlot *slot)
143 : {
144 : PgStat_EntryRef *entry_ref;
145 : PgStatShared_ReplSlot *shstatent;
146 :
147 : Assert(LWLockHeldByMeInMode(ReplicationSlotAllocationLock, LW_EXCLUSIVE));
148 :
149 940 : entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
150 940 : ReplicationSlotIndex(slot), false);
151 940 : shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
152 :
153 : /*
154 : * NB: need to accept that there might be stats from an older slot, e.g.
155 : * if we previously crashed after dropping a slot.
156 : */
157 940 : memset(&shstatent->stats, 0, sizeof(shstatent->stats));
158 :
159 940 : pgstat_unlock_entry(entry_ref);
160 940 : }
161 :
162 : /*
163 : * Report replication slot has been acquired.
164 : *
165 : * This guarantees that a stats entry exists during later
166 : * pgstat_report_replslot() or pgstat_report_replslotsync() calls.
167 : *
168 : * If we previously crashed, no stats data exists. But if we did not crash,
169 : * the stats do belong to this slot:
170 : * - the stats cannot belong to a dropped slot, pgstat_drop_replslot() would
171 : * have been called
172 : * - if the slot was removed while shut down,
173 : * pgstat_replslot_from_serialized_name_cb() returning false would have
174 : * caused the stats to be dropped
175 : */
176 : void
177 2110 : pgstat_acquire_replslot(ReplicationSlot *slot)
178 : {
179 2110 : pgstat_get_entry_ref(PGSTAT_KIND_REPLSLOT, InvalidOid,
180 2110 : ReplicationSlotIndex(slot), true, NULL);
181 2110 : }
182 :
183 : /*
184 : * Report replication slot drop.
185 : */
186 : void
187 814 : pgstat_drop_replslot(ReplicationSlot *slot)
188 : {
189 : Assert(LWLockHeldByMeInMode(ReplicationSlotAllocationLock, LW_EXCLUSIVE));
190 :
191 814 : if (!pgstat_drop_entry(PGSTAT_KIND_REPLSLOT, InvalidOid,
192 814 : ReplicationSlotIndex(slot)))
193 96 : pgstat_request_entry_refs_gc();
194 814 : }
195 :
196 : /*
197 : * Support function for the SQL-callable pgstat* functions. Returns
198 : * a pointer to the replication slot statistics struct.
199 : */
200 : PgStat_StatReplSlotEntry *
201 104 : pgstat_fetch_replslot(NameData slotname)
202 : {
203 : int idx;
204 104 : PgStat_StatReplSlotEntry *slotentry = NULL;
205 :
206 104 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
207 :
208 104 : idx = get_replslot_index(NameStr(slotname), false);
209 :
210 104 : if (idx != -1)
211 100 : slotentry = (PgStat_StatReplSlotEntry *) pgstat_fetch_entry(PGSTAT_KIND_REPLSLOT,
212 : InvalidOid, idx);
213 :
214 104 : LWLockRelease(ReplicationSlotControlLock);
215 :
216 104 : return slotentry;
217 : }
218 :
219 : void
220 190 : pgstat_replslot_to_serialized_name_cb(const PgStat_HashKey *key, const PgStatShared_Common *header, NameData *name)
221 : {
222 : /*
223 : * This is only called late during shutdown. The set of existing slots
224 : * isn't allowed to change at this point, we can assume that a slot exists
225 : * at the offset.
226 : */
227 190 : if (!ReplicationSlotName(key->objid, name))
228 0 : elog(ERROR, "could not find name for replication slot index %" PRIu64,
229 : key->objid);
230 190 : }
231 :
232 : bool
233 134 : pgstat_replslot_from_serialized_name_cb(const NameData *name, PgStat_HashKey *key)
234 : {
235 134 : int idx = get_replslot_index(NameStr(*name), true);
236 :
237 : /* slot might have been deleted */
238 134 : if (idx == -1)
239 2 : return false;
240 :
241 132 : key->kind = PGSTAT_KIND_REPLSLOT;
242 132 : key->dboid = InvalidOid;
243 132 : key->objid = idx;
244 :
245 132 : return true;
246 : }
247 :
248 : void
249 16 : pgstat_replslot_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
250 : {
251 16 : ((PgStatShared_ReplSlot *) header)->stats.stat_reset_timestamp = ts;
252 16 : }
253 :
254 : static int
255 238 : get_replslot_index(const char *name, bool need_lock)
256 : {
257 : ReplicationSlot *slot;
258 :
259 : Assert(name != NULL);
260 :
261 238 : slot = SearchNamedReplicationSlot(name, need_lock);
262 :
263 238 : if (!slot)
264 6 : return -1;
265 :
266 232 : return ReplicationSlotIndex(slot);
267 : }
|