Line data Source code
1 : /* ----------
2 : * backend_progress.c
3 : *
4 : * Command progress reporting infrastructure.
5 : *
6 : * Copyright (c) 2001-2026, PostgreSQL Global Development Group
7 : *
8 : * src/backend/utils/activity/backend_progress.c
9 : * ----------
10 : */
11 : #include "postgres.h"
12 :
13 : #include "access/parallel.h"
14 : #include "libpq/pqformat.h"
15 : #include "utils/backend_progress.h"
16 : #include "utils/backend_status.h"
17 :
18 :
19 : /*-----------
20 : * pgstat_progress_start_command() -
21 : *
22 : * Set st_progress_command (and st_progress_command_target) in own backend
23 : * entry. Also, zero-initialize st_progress_param array.
24 : *-----------
25 : */
26 : void
27 147495 : pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
28 : {
29 147495 : volatile PgBackendStatus *beentry = MyBEEntry;
30 :
31 147495 : if (!beentry || !pgstat_track_activities)
32 0 : return;
33 :
34 147495 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
35 147495 : beentry->st_progress_command = cmdtype;
36 147495 : beentry->st_progress_command_target = relid;
37 3097395 : MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param));
38 147495 : PGSTAT_END_WRITE_ACTIVITY(beentry);
39 : }
40 :
41 : /*-----------
42 : * pgstat_progress_update_param() -
43 : *
44 : * Update index'th member in st_progress_param[] of own backend entry.
45 : *-----------
46 : */
47 : void
48 12609407 : pgstat_progress_update_param(int index, int64 val)
49 : {
50 12609407 : volatile PgBackendStatus *beentry = MyBEEntry;
51 :
52 : Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM);
53 :
54 12609407 : if (!beentry || !pgstat_track_activities)
55 0 : return;
56 :
57 12609407 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
58 12609407 : beentry->st_progress_param[index] = val;
59 12609407 : PGSTAT_END_WRITE_ACTIVITY(beentry);
60 : }
61 :
62 : /*-----------
63 : * pgstat_progress_incr_param() -
64 : *
65 : * Increment index'th member in st_progress_param[] of own backend entry.
66 : *-----------
67 : */
68 : void
69 1685 : pgstat_progress_incr_param(int index, int64 incr)
70 : {
71 1685 : volatile PgBackendStatus *beentry = MyBEEntry;
72 :
73 : Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM);
74 :
75 1685 : if (!beentry || !pgstat_track_activities)
76 0 : return;
77 :
78 1685 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
79 1685 : beentry->st_progress_param[index] += incr;
80 1685 : PGSTAT_END_WRITE_ACTIVITY(beentry);
81 : }
82 :
83 : /*-----------
84 : * pgstat_progress_parallel_incr_param() -
85 : *
86 : * A variant of pgstat_progress_incr_param to allow a worker to poke at
87 : * a leader to do an incremental progress update.
88 : *-----------
89 : */
90 : void
91 90 : pgstat_progress_parallel_incr_param(int index, int64 incr)
92 : {
93 : /*
94 : * Parallel workers notify a leader through a PqMsg_Progress message to
95 : * update progress, passing the progress index and incremented value.
96 : * Leaders can just call pgstat_progress_incr_param directly.
97 : */
98 90 : if (IsParallelWorker())
99 : {
100 : static StringInfoData progress_message;
101 :
102 1 : initStringInfo(&progress_message);
103 :
104 1 : pq_beginmessage(&progress_message, PqMsg_Progress);
105 1 : pq_sendint32(&progress_message, index);
106 1 : pq_sendint64(&progress_message, incr);
107 1 : pq_endmessage(&progress_message);
108 : }
109 : else
110 89 : pgstat_progress_incr_param(index, incr);
111 90 : }
112 :
113 : /*-----------
114 : * pgstat_progress_update_multi_param() -
115 : *
116 : * Update multiple members in st_progress_param[] of own backend entry.
117 : * This is atomic; readers won't see intermediate states.
118 : *-----------
119 : */
120 : void
121 872140 : pgstat_progress_update_multi_param(int nparam, const int *index,
122 : const int64 *val)
123 : {
124 872140 : volatile PgBackendStatus *beentry = MyBEEntry;
125 : int i;
126 :
127 872140 : if (!beentry || !pgstat_track_activities || nparam == 0)
128 0 : return;
129 :
130 872140 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
131 :
132 2522695 : for (i = 0; i < nparam; ++i)
133 : {
134 : Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM);
135 :
136 1650555 : beentry->st_progress_param[index[i]] = val[i];
137 : }
138 :
139 872140 : PGSTAT_END_WRITE_ACTIVITY(beentry);
140 : }
141 :
142 : /*-----------
143 : * pgstat_progress_end_command() -
144 : *
145 : * Reset st_progress_command (and st_progress_command_target) in own backend
146 : * entry. This signals the end of the command.
147 : *-----------
148 : */
149 : void
150 177308 : pgstat_progress_end_command(void)
151 : {
152 177308 : volatile PgBackendStatus *beentry = MyBEEntry;
153 :
154 177308 : if (!beentry || !pgstat_track_activities)
155 0 : return;
156 :
157 177308 : if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID)
158 30596 : return;
159 :
160 146712 : PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
161 146712 : beentry->st_progress_command = PROGRESS_COMMAND_INVALID;
162 146712 : beentry->st_progress_command_target = InvalidOid;
163 146712 : PGSTAT_END_WRITE_ACTIVITY(beentry);
164 : }
|