Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/slot.h"
35 : #include "replication/walreceiver.h"
36 : #include "replication/worker_internal.h"
37 : #include "storage/ipc.h"
38 : #include "storage/proc.h"
39 : #include "storage/procarray.h"
40 : #include "tcop/tcopprot.h"
41 : #include "utils/builtins.h"
42 : #include "utils/memutils.h"
43 : #include "utils/pg_lsn.h"
44 : #include "utils/snapmgr.h"
45 :
46 : /* max sleep time between cycles (3min) */
47 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
48 :
49 : /* GUC variables */
50 : int max_logical_replication_workers = 4;
51 : int max_sync_workers_per_subscription = 2;
52 : int max_parallel_apply_workers_per_subscription = 2;
53 :
54 : LogicalRepWorker *MyLogicalRepWorker = NULL;
55 :
56 : typedef struct LogicalRepCtxStruct
57 : {
58 : /* Supervisor process. */
59 : pid_t launcher_pid;
60 :
61 : /* Hash table holding last start times of subscriptions' apply workers. */
62 : dsa_handle last_start_dsa;
63 : dshash_table_handle last_start_dsh;
64 :
65 : /* Background workers. */
66 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
67 : } LogicalRepCtxStruct;
68 :
69 : static LogicalRepCtxStruct *LogicalRepCtx;
70 :
71 : /* an entry in the last-start-times shared hash table */
72 : typedef struct LauncherLastStartTimesEntry
73 : {
74 : Oid subid; /* OID of logrep subscription (hash key) */
75 : TimestampTz last_start_time; /* last time its apply worker was started */
76 : } LauncherLastStartTimesEntry;
77 :
78 : /* parameters for the last-start-times shared hash table */
79 : static const dshash_parameters dsh_params = {
80 : sizeof(Oid),
81 : sizeof(LauncherLastStartTimesEntry),
82 : dshash_memcmp,
83 : dshash_memhash,
84 : dshash_memcpy,
85 : LWTRANCHE_LAUNCHER_HASH
86 : };
87 :
88 : static dsa_area *last_start_times_dsa = NULL;
89 : static dshash_table *last_start_times = NULL;
90 :
91 : static bool on_commit_launcher_wakeup = false;
92 :
93 :
94 : static void ApplyLauncherWakeup(void);
95 : static void logicalrep_launcher_onexit(int code, Datum arg);
96 : static void logicalrep_worker_onexit(int code, Datum arg);
97 : static void logicalrep_worker_detach(void);
98 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 : static int logicalrep_pa_worker_count(Oid subid);
100 : static void logicalrep_launcher_attach_dshmem(void);
101 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
102 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
103 :
104 :
105 : /*
106 : * Load the list of subscriptions.
107 : *
108 : * Only the fields interesting for worker start/stop functions are filled for
109 : * each subscription.
110 : */
111 : static List *
112 4186 : get_subscription_list(void)
113 : {
114 4186 : List *res = NIL;
115 : Relation rel;
116 : TableScanDesc scan;
117 : HeapTuple tup;
118 : MemoryContext resultcxt;
119 :
120 : /* This is the context that we will allocate our output data in */
121 4186 : resultcxt = CurrentMemoryContext;
122 :
123 : /*
124 : * Start a transaction so we can access pg_database, and get a snapshot.
125 : * We don't have a use for the snapshot itself, but we're interested in
126 : * the secondary effect that it sets RecentGlobalXmin. (This is critical
127 : * for anything that reads heap pages, because HOT may decide to prune
128 : * them even if the process doesn't attempt to modify any tuples.)
129 : *
130 : * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
131 : * not pushed/active does not reliably prevent HOT pruning (->xmin could
132 : * e.g. be cleared when cache invalidations are processed).
133 : */
134 4186 : StartTransactionCommand();
135 4186 : (void) GetTransactionSnapshot();
136 :
137 4186 : rel = table_open(SubscriptionRelationId, AccessShareLock);
138 4186 : scan = table_beginscan_catalog(rel, 0, NULL);
139 :
140 4932 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
141 : {
142 746 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
143 : Subscription *sub;
144 : MemoryContext oldcxt;
145 :
146 : /*
147 : * Allocate our results in the caller's context, not the
148 : * transaction's. We do this inside the loop, and restore the original
149 : * context at the end, so that leaky things like heap_getnext() are
150 : * not called in a potentially long-lived context.
151 : */
152 746 : oldcxt = MemoryContextSwitchTo(resultcxt);
153 :
154 746 : sub = (Subscription *) palloc0(sizeof(Subscription));
155 746 : sub->oid = subform->oid;
156 746 : sub->dbid = subform->subdbid;
157 746 : sub->owner = subform->subowner;
158 746 : sub->enabled = subform->subenabled;
159 746 : sub->name = pstrdup(NameStr(subform->subname));
160 : /* We don't fill fields we are not interested in. */
161 :
162 746 : res = lappend(res, sub);
163 746 : MemoryContextSwitchTo(oldcxt);
164 : }
165 :
166 4186 : table_endscan(scan);
167 4186 : table_close(rel, AccessShareLock);
168 :
169 4186 : CommitTransactionCommand();
170 :
171 4186 : return res;
172 : }
173 :
174 : /*
175 : * Wait for a background worker to start up and attach to the shmem context.
176 : *
177 : * This is only needed for cleaning up the shared memory in case the worker
178 : * fails to attach.
179 : *
180 : * Returns whether the attach was successful.
181 : */
182 : static bool
183 3402 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
184 : uint16 generation,
185 : BackgroundWorkerHandle *handle)
186 : {
187 : BgwHandleStatus status;
188 : int rc;
189 :
190 : for (;;)
191 2798 : {
192 : pid_t pid;
193 :
194 3402 : CHECK_FOR_INTERRUPTS();
195 :
196 3402 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 :
198 : /* Worker either died or has started. Return false if died. */
199 3402 : if (!worker->in_use || worker->proc)
200 : {
201 600 : LWLockRelease(LogicalRepWorkerLock);
202 600 : return worker->in_use;
203 : }
204 :
205 2802 : LWLockRelease(LogicalRepWorkerLock);
206 :
207 : /* Check if worker has died before attaching, and clean up after it. */
208 2802 : status = GetBackgroundWorkerPid(handle, &pid);
209 :
210 2802 : if (status == BGWH_STOPPED)
211 : {
212 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
213 : /* Ensure that this was indeed the worker we waited for. */
214 0 : if (generation == worker->generation)
215 0 : logicalrep_worker_cleanup(worker);
216 0 : LWLockRelease(LogicalRepWorkerLock);
217 0 : return false;
218 : }
219 :
220 : /*
221 : * We need timeout because we generally don't get notified via latch
222 : * about the worker attach. But we don't expect to have to wait long.
223 : */
224 2802 : rc = WaitLatch(MyLatch,
225 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
226 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
227 :
228 2802 : if (rc & WL_LATCH_SET)
229 : {
230 846 : ResetLatch(MyLatch);
231 846 : CHECK_FOR_INTERRUPTS();
232 : }
233 : }
234 : }
235 :
236 : /*
237 : * Walks the workers array and searches for one that matches given
238 : * subscription id and relid.
239 : *
240 : * We are only interested in the leader apply worker or table sync worker.
241 : */
242 : LogicalRepWorker *
243 3834 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
244 : {
245 : int i;
246 3834 : LogicalRepWorker *res = NULL;
247 :
248 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
249 :
250 : /* Search for attached worker for a given subscription id. */
251 11496 : for (i = 0; i < max_logical_replication_workers; i++)
252 : {
253 10100 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
254 :
255 : /* Skip parallel apply workers. */
256 10100 : if (isParallelApplyWorker(w))
257 0 : continue;
258 :
259 10100 : if (w->in_use && w->subid == subid && w->relid == relid &&
260 2438 : (!only_running || w->proc))
261 : {
262 2438 : res = w;
263 2438 : break;
264 : }
265 : }
266 :
267 3834 : return res;
268 : }
269 :
270 : /*
271 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
272 : * the subscription, instead of just one.
273 : */
274 : List *
275 900 : logicalrep_workers_find(Oid subid, bool only_running)
276 : {
277 : int i;
278 900 : List *res = NIL;
279 :
280 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
281 :
282 : /* Search for attached worker for a given subscription id. */
283 4724 : for (i = 0; i < max_logical_replication_workers; i++)
284 : {
285 3824 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
286 :
287 3824 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
288 588 : res = lappend(res, w);
289 : }
290 :
291 900 : return res;
292 : }
293 :
294 : /*
295 : * Start new logical replication background worker, if possible.
296 : *
297 : * Returns true on success, false on failure.
298 : */
299 : bool
300 604 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
301 : Oid dbid, Oid subid, const char *subname, Oid userid,
302 : Oid relid, dsm_handle subworker_dsm)
303 : {
304 : BackgroundWorker bgw;
305 : BackgroundWorkerHandle *bgw_handle;
306 : uint16 generation;
307 : int i;
308 604 : int slot = 0;
309 604 : LogicalRepWorker *worker = NULL;
310 : int nsyncworkers;
311 : int nparallelapplyworkers;
312 : TimestampTz now;
313 604 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
314 604 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
315 :
316 : /*----------
317 : * Sanity checks:
318 : * - must be valid worker type
319 : * - tablesync workers are only ones to have relid
320 : * - parallel apply worker is the only kind of subworker
321 : */
322 : Assert(wtype != WORKERTYPE_UNKNOWN);
323 : Assert(is_tablesync_worker == OidIsValid(relid));
324 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
325 :
326 604 : ereport(DEBUG1,
327 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
328 : subname)));
329 :
330 : /* Report this after the initial starting message for consistency. */
331 604 : if (max_replication_slots == 0)
332 0 : ereport(ERROR,
333 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
334 : errmsg("cannot start logical replication workers when max_replication_slots = 0")));
335 :
336 : /*
337 : * We need to do the modification of the shared memory under lock so that
338 : * we have consistent view.
339 : */
340 604 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
341 :
342 604 : retry:
343 : /* Find unused worker slot. */
344 1096 : for (i = 0; i < max_logical_replication_workers; i++)
345 : {
346 1096 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
347 :
348 1096 : if (!w->in_use)
349 : {
350 604 : worker = w;
351 604 : slot = i;
352 604 : break;
353 : }
354 : }
355 :
356 604 : nsyncworkers = logicalrep_sync_worker_count(subid);
357 :
358 604 : now = GetCurrentTimestamp();
359 :
360 : /*
361 : * If we didn't find a free slot, try to do garbage collection. The
362 : * reason we do this is because if some worker failed to start up and its
363 : * parent has crashed while waiting, the in_use state was never cleared.
364 : */
365 604 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
366 : {
367 0 : bool did_cleanup = false;
368 :
369 0 : for (i = 0; i < max_logical_replication_workers; i++)
370 : {
371 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
372 :
373 : /*
374 : * If the worker was marked in use but didn't manage to attach in
375 : * time, clean it up.
376 : */
377 0 : if (w->in_use && !w->proc &&
378 0 : TimestampDifferenceExceeds(w->launch_time, now,
379 : wal_receiver_timeout))
380 : {
381 0 : elog(WARNING,
382 : "logical replication worker for subscription %u took too long to start; canceled",
383 : w->subid);
384 :
385 0 : logicalrep_worker_cleanup(w);
386 0 : did_cleanup = true;
387 : }
388 : }
389 :
390 0 : if (did_cleanup)
391 0 : goto retry;
392 : }
393 :
394 : /*
395 : * We don't allow to invoke more sync workers once we have reached the
396 : * sync worker limit per subscription. So, just return silently as we
397 : * might get here because of an otherwise harmless race condition.
398 : */
399 604 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
400 : {
401 0 : LWLockRelease(LogicalRepWorkerLock);
402 0 : return false;
403 : }
404 :
405 604 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
406 :
407 : /*
408 : * Return false if the number of parallel apply workers reached the limit
409 : * per subscription.
410 : */
411 604 : if (is_parallel_apply_worker &&
412 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
413 : {
414 0 : LWLockRelease(LogicalRepWorkerLock);
415 0 : return false;
416 : }
417 :
418 : /*
419 : * However if there are no more free worker slots, inform user about it
420 : * before exiting.
421 : */
422 604 : if (worker == NULL)
423 : {
424 0 : LWLockRelease(LogicalRepWorkerLock);
425 0 : ereport(WARNING,
426 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
427 : errmsg("out of logical replication worker slots"),
428 : errhint("You might need to increase %s.", "max_logical_replication_workers")));
429 0 : return false;
430 : }
431 :
432 : /* Prepare the worker slot. */
433 604 : worker->type = wtype;
434 604 : worker->launch_time = now;
435 604 : worker->in_use = true;
436 604 : worker->generation++;
437 604 : worker->proc = NULL;
438 604 : worker->dbid = dbid;
439 604 : worker->userid = userid;
440 604 : worker->subid = subid;
441 604 : worker->relid = relid;
442 604 : worker->relstate = SUBREL_STATE_UNKNOWN;
443 604 : worker->relstate_lsn = InvalidXLogRecPtr;
444 604 : worker->stream_fileset = NULL;
445 604 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
446 604 : worker->parallel_apply = is_parallel_apply_worker;
447 604 : worker->last_lsn = InvalidXLogRecPtr;
448 604 : TIMESTAMP_NOBEGIN(worker->last_send_time);
449 604 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
450 604 : worker->reply_lsn = InvalidXLogRecPtr;
451 604 : TIMESTAMP_NOBEGIN(worker->reply_time);
452 :
453 : /* Before releasing lock, remember generation for future identification. */
454 604 : generation = worker->generation;
455 :
456 604 : LWLockRelease(LogicalRepWorkerLock);
457 :
458 : /* Register the new dynamic worker. */
459 604 : memset(&bgw, 0, sizeof(bgw));
460 604 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
461 : BGWORKER_BACKEND_DATABASE_CONNECTION;
462 604 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
463 604 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
464 :
465 604 : switch (worker->type)
466 : {
467 248 : case WORKERTYPE_APPLY:
468 248 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
469 248 : snprintf(bgw.bgw_name, BGW_MAXLEN,
470 : "logical replication apply worker for subscription %u",
471 : subid);
472 248 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
473 248 : break;
474 :
475 20 : case WORKERTYPE_PARALLEL_APPLY:
476 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
477 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
478 : "logical replication parallel apply worker for subscription %u",
479 : subid);
480 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
481 :
482 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
483 20 : break;
484 :
485 336 : case WORKERTYPE_TABLESYNC:
486 336 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
487 336 : snprintf(bgw.bgw_name, BGW_MAXLEN,
488 : "logical replication tablesync worker for subscription %u sync %u",
489 : subid,
490 : relid);
491 336 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
492 336 : break;
493 :
494 0 : case WORKERTYPE_UNKNOWN:
495 : /* Should never happen. */
496 0 : elog(ERROR, "unknown worker type");
497 : }
498 :
499 604 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
500 604 : bgw.bgw_notify_pid = MyProcPid;
501 604 : bgw.bgw_main_arg = Int32GetDatum(slot);
502 :
503 604 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
504 : {
505 : /* Failed to start worker, so clean up the worker slot. */
506 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
507 : Assert(generation == worker->generation);
508 0 : logicalrep_worker_cleanup(worker);
509 0 : LWLockRelease(LogicalRepWorkerLock);
510 :
511 0 : ereport(WARNING,
512 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
513 : errmsg("out of background worker slots"),
514 : errhint("You might need to increase %s.", "max_worker_processes")));
515 0 : return false;
516 : }
517 :
518 : /* Now wait until it attaches. */
519 604 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
520 : }
521 :
522 : /*
523 : * Internal function to stop the worker and wait until it detaches from the
524 : * slot.
525 : */
526 : static void
527 126 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
528 : {
529 : uint16 generation;
530 :
531 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
532 :
533 : /*
534 : * Remember which generation was our worker so we can check if what we see
535 : * is still the same one.
536 : */
537 126 : generation = worker->generation;
538 :
539 : /*
540 : * If we found a worker but it does not have proc set then it is still
541 : * starting up; wait for it to finish starting and then kill it.
542 : */
543 138 : while (worker->in_use && !worker->proc)
544 : {
545 : int rc;
546 :
547 18 : LWLockRelease(LogicalRepWorkerLock);
548 :
549 : /* Wait a bit --- we don't expect to have to wait long. */
550 18 : rc = WaitLatch(MyLatch,
551 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
552 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
553 :
554 18 : if (rc & WL_LATCH_SET)
555 : {
556 0 : ResetLatch(MyLatch);
557 0 : CHECK_FOR_INTERRUPTS();
558 : }
559 :
560 : /* Recheck worker status. */
561 18 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
562 :
563 : /*
564 : * Check whether the worker slot is no longer used, which would mean
565 : * that the worker has exited, or whether the worker generation is
566 : * different, meaning that a different worker has taken the slot.
567 : */
568 18 : if (!worker->in_use || worker->generation != generation)
569 0 : return;
570 :
571 : /* Worker has assigned proc, so it has started. */
572 18 : if (worker->proc)
573 6 : break;
574 : }
575 :
576 : /* Now terminate the worker ... */
577 126 : kill(worker->proc->pid, signo);
578 :
579 : /* ... and wait for it to die. */
580 : for (;;)
581 156 : {
582 : int rc;
583 :
584 : /* is it gone? */
585 282 : if (!worker->proc || worker->generation != generation)
586 : break;
587 :
588 156 : LWLockRelease(LogicalRepWorkerLock);
589 :
590 : /* Wait a bit --- we don't expect to have to wait long. */
591 156 : rc = WaitLatch(MyLatch,
592 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
593 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
594 :
595 156 : if (rc & WL_LATCH_SET)
596 : {
597 34 : ResetLatch(MyLatch);
598 34 : CHECK_FOR_INTERRUPTS();
599 : }
600 :
601 156 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
602 : }
603 : }
604 :
605 : /*
606 : * Stop the logical replication worker for subid/relid, if any.
607 : */
608 : void
609 144 : logicalrep_worker_stop(Oid subid, Oid relid)
610 : {
611 : LogicalRepWorker *worker;
612 :
613 144 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
614 :
615 144 : worker = logicalrep_worker_find(subid, relid, false);
616 :
617 144 : if (worker)
618 : {
619 : Assert(!isParallelApplyWorker(worker));
620 108 : logicalrep_worker_stop_internal(worker, SIGTERM);
621 : }
622 :
623 144 : LWLockRelease(LogicalRepWorkerLock);
624 144 : }
625 :
626 : /*
627 : * Stop the given logical replication parallel apply worker.
628 : *
629 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
630 : * worker so that the worker exits cleanly.
631 : */
632 : void
633 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
634 : {
635 : int slot_no;
636 : uint16 generation;
637 : LogicalRepWorker *worker;
638 :
639 10 : SpinLockAcquire(&winfo->shared->mutex);
640 10 : generation = winfo->shared->logicalrep_worker_generation;
641 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
642 10 : SpinLockRelease(&winfo->shared->mutex);
643 :
644 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
645 :
646 : /*
647 : * Detach from the error_mq_handle for the parallel apply worker before
648 : * stopping it. This prevents the leader apply worker from trying to
649 : * receive the message from the error queue that might already be detached
650 : * by the parallel apply worker.
651 : */
652 10 : if (winfo->error_mq_handle)
653 : {
654 10 : shm_mq_detach(winfo->error_mq_handle);
655 10 : winfo->error_mq_handle = NULL;
656 : }
657 :
658 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
659 :
660 10 : worker = &LogicalRepCtx->workers[slot_no];
661 : Assert(isParallelApplyWorker(worker));
662 :
663 : /*
664 : * Only stop the worker if the generation matches and the worker is alive.
665 : */
666 10 : if (worker->generation == generation && worker->proc)
667 10 : logicalrep_worker_stop_internal(worker, SIGINT);
668 :
669 10 : LWLockRelease(LogicalRepWorkerLock);
670 10 : }
671 :
672 : /*
673 : * Wake up (using latch) any logical replication worker for specified sub/rel.
674 : */
675 : void
676 374 : logicalrep_worker_wakeup(Oid subid, Oid relid)
677 : {
678 : LogicalRepWorker *worker;
679 :
680 374 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
681 :
682 374 : worker = logicalrep_worker_find(subid, relid, true);
683 :
684 374 : if (worker)
685 374 : logicalrep_worker_wakeup_ptr(worker);
686 :
687 374 : LWLockRelease(LogicalRepWorkerLock);
688 374 : }
689 :
690 : /*
691 : * Wake up (using latch) the specified logical replication worker.
692 : *
693 : * Caller must hold lock, else worker->proc could change under us.
694 : */
695 : void
696 1104 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
697 : {
698 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
699 :
700 1104 : SetLatch(&worker->proc->procLatch);
701 1104 : }
702 :
703 : /*
704 : * Attach to a slot.
705 : */
706 : void
707 738 : logicalrep_worker_attach(int slot)
708 : {
709 : /* Block concurrent access. */
710 738 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
711 :
712 : Assert(slot >= 0 && slot < max_logical_replication_workers);
713 738 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
714 :
715 738 : if (!MyLogicalRepWorker->in_use)
716 : {
717 0 : LWLockRelease(LogicalRepWorkerLock);
718 0 : ereport(ERROR,
719 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
720 : errmsg("logical replication worker slot %d is empty, cannot attach",
721 : slot)));
722 : }
723 :
724 738 : if (MyLogicalRepWorker->proc)
725 : {
726 0 : LWLockRelease(LogicalRepWorkerLock);
727 0 : ereport(ERROR,
728 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 : errmsg("logical replication worker slot %d is already used by "
730 : "another worker, cannot attach", slot)));
731 : }
732 :
733 738 : MyLogicalRepWorker->proc = MyProc;
734 738 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
735 :
736 738 : LWLockRelease(LogicalRepWorkerLock);
737 738 : }
738 :
739 : /*
740 : * Stop the parallel apply workers if any, and detach the leader apply worker
741 : * (cleans up the worker info).
742 : */
743 : static void
744 738 : logicalrep_worker_detach(void)
745 : {
746 : /* Stop the parallel apply workers. */
747 738 : if (am_leader_apply_worker())
748 : {
749 : List *workers;
750 : ListCell *lc;
751 :
752 : /*
753 : * Detach from the error_mq_handle for all parallel apply workers
754 : * before terminating them. This prevents the leader apply worker from
755 : * receiving the worker termination message and sending it to logs
756 : * when the same is already done by the parallel worker.
757 : */
758 378 : pa_detach_all_error_mq();
759 :
760 378 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
761 :
762 378 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
763 766 : foreach(lc, workers)
764 : {
765 388 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
766 :
767 388 : if (isParallelApplyWorker(w))
768 8 : logicalrep_worker_stop_internal(w, SIGTERM);
769 : }
770 :
771 378 : LWLockRelease(LogicalRepWorkerLock);
772 : }
773 :
774 : /* Block concurrent access. */
775 738 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
776 :
777 738 : logicalrep_worker_cleanup(MyLogicalRepWorker);
778 :
779 738 : LWLockRelease(LogicalRepWorkerLock);
780 738 : }
781 :
782 : /*
783 : * Clean up worker info.
784 : */
785 : static void
786 738 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
787 : {
788 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
789 :
790 738 : worker->type = WORKERTYPE_UNKNOWN;
791 738 : worker->in_use = false;
792 738 : worker->proc = NULL;
793 738 : worker->dbid = InvalidOid;
794 738 : worker->userid = InvalidOid;
795 738 : worker->subid = InvalidOid;
796 738 : worker->relid = InvalidOid;
797 738 : worker->leader_pid = InvalidPid;
798 738 : worker->parallel_apply = false;
799 738 : }
800 :
801 : /*
802 : * Cleanup function for logical replication launcher.
803 : *
804 : * Called on logical replication launcher exit.
805 : */
806 : static void
807 686 : logicalrep_launcher_onexit(int code, Datum arg)
808 : {
809 686 : LogicalRepCtx->launcher_pid = 0;
810 686 : }
811 :
812 : /*
813 : * Cleanup function.
814 : *
815 : * Called on logical replication worker exit.
816 : */
817 : static void
818 738 : logicalrep_worker_onexit(int code, Datum arg)
819 : {
820 : /* Disconnect gracefully from the remote side. */
821 738 : if (LogRepWorkerWalRcvConn)
822 660 : walrcv_disconnect(LogRepWorkerWalRcvConn);
823 :
824 738 : logicalrep_worker_detach();
825 :
826 : /* Cleanup fileset used for streaming transactions. */
827 738 : if (MyLogicalRepWorker->stream_fileset != NULL)
828 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
829 :
830 : /*
831 : * Session level locks may be acquired outside of a transaction in
832 : * parallel apply mode and will not be released when the worker
833 : * terminates, so manually release all locks before the worker exits.
834 : *
835 : * The locks will be acquired once the worker is initialized.
836 : */
837 738 : if (!InitializingApplyWorker)
838 728 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
839 :
840 738 : ApplyLauncherWakeup();
841 738 : }
842 :
843 : /*
844 : * Count the number of registered (not necessarily running) sync workers
845 : * for a subscription.
846 : */
847 : int
848 1668 : logicalrep_sync_worker_count(Oid subid)
849 : {
850 : int i;
851 1668 : int res = 0;
852 :
853 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
854 :
855 : /* Search for attached worker for a given subscription id. */
856 8668 : for (i = 0; i < max_logical_replication_workers; i++)
857 : {
858 7000 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
859 :
860 7000 : if (isTablesyncWorker(w) && w->subid == subid)
861 1664 : res++;
862 : }
863 :
864 1668 : return res;
865 : }
866 :
867 : /*
868 : * Count the number of registered (but not necessarily running) parallel apply
869 : * workers for a subscription.
870 : */
871 : static int
872 604 : logicalrep_pa_worker_count(Oid subid)
873 : {
874 : int i;
875 604 : int res = 0;
876 :
877 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
878 :
879 : /*
880 : * Scan all attached parallel apply workers, only counting those which
881 : * have the given subscription id.
882 : */
883 3208 : for (i = 0; i < max_logical_replication_workers; i++)
884 : {
885 2604 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
886 :
887 2604 : if (isParallelApplyWorker(w) && w->subid == subid)
888 4 : res++;
889 : }
890 :
891 604 : return res;
892 : }
893 :
894 : /*
895 : * ApplyLauncherShmemSize
896 : * Compute space needed for replication launcher shared memory
897 : */
898 : Size
899 6834 : ApplyLauncherShmemSize(void)
900 : {
901 : Size size;
902 :
903 : /*
904 : * Need the fixed struct and the array of LogicalRepWorker.
905 : */
906 6834 : size = sizeof(LogicalRepCtxStruct);
907 6834 : size = MAXALIGN(size);
908 6834 : size = add_size(size, mul_size(max_logical_replication_workers,
909 : sizeof(LogicalRepWorker)));
910 6834 : return size;
911 : }
912 :
913 : /*
914 : * ApplyLauncherRegister
915 : * Register a background worker running the logical replication launcher.
916 : */
917 : void
918 1432 : ApplyLauncherRegister(void)
919 : {
920 : BackgroundWorker bgw;
921 :
922 : /*
923 : * The logical replication launcher is disabled during binary upgrades, to
924 : * prevent logical replication workers from running on the source cluster.
925 : * That could cause replication origins to move forward after having been
926 : * copied to the target cluster, potentially creating conflicts with the
927 : * copied data files.
928 : */
929 1432 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
930 48 : return;
931 :
932 1384 : memset(&bgw, 0, sizeof(bgw));
933 1384 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
934 : BGWORKER_BACKEND_DATABASE_CONNECTION;
935 1384 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
936 1384 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
937 1384 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
938 1384 : snprintf(bgw.bgw_name, BGW_MAXLEN,
939 : "logical replication launcher");
940 1384 : snprintf(bgw.bgw_type, BGW_MAXLEN,
941 : "logical replication launcher");
942 1384 : bgw.bgw_restart_time = 5;
943 1384 : bgw.bgw_notify_pid = 0;
944 1384 : bgw.bgw_main_arg = (Datum) 0;
945 :
946 1384 : RegisterBackgroundWorker(&bgw);
947 : }
948 :
949 : /*
950 : * ApplyLauncherShmemInit
951 : * Allocate and initialize replication launcher shared memory
952 : */
953 : void
954 1768 : ApplyLauncherShmemInit(void)
955 : {
956 : bool found;
957 :
958 1768 : LogicalRepCtx = (LogicalRepCtxStruct *)
959 1768 : ShmemInitStruct("Logical Replication Launcher Data",
960 : ApplyLauncherShmemSize(),
961 : &found);
962 :
963 1768 : if (!found)
964 : {
965 : int slot;
966 :
967 1768 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
968 :
969 1768 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
970 1768 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
971 :
972 : /* Initialize memory and spin locks for each worker slot. */
973 8806 : for (slot = 0; slot < max_logical_replication_workers; slot++)
974 : {
975 7038 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
976 :
977 7038 : memset(worker, 0, sizeof(LogicalRepWorker));
978 7038 : SpinLockInit(&worker->relmutex);
979 : }
980 : }
981 1768 : }
982 :
983 : /*
984 : * Initialize or attach to the dynamic shared hash table that stores the
985 : * last-start times, if not already done.
986 : * This must be called before accessing the table.
987 : */
988 : static void
989 798 : logicalrep_launcher_attach_dshmem(void)
990 : {
991 : MemoryContext oldcontext;
992 :
993 : /* Quick exit if we already did this. */
994 798 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
995 708 : last_start_times != NULL)
996 512 : return;
997 :
998 : /* Otherwise, use a lock to ensure only one process creates the table. */
999 286 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1000 :
1001 : /* Be sure any local memory allocated by DSA routines is persistent. */
1002 286 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1003 :
1004 286 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1005 : {
1006 : /* Initialize dynamic shared hash table for last-start times. */
1007 90 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1008 90 : dsa_pin(last_start_times_dsa);
1009 90 : dsa_pin_mapping(last_start_times_dsa);
1010 90 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1011 :
1012 : /* Store handles in shared memory for other backends to use. */
1013 90 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1014 90 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1015 : }
1016 196 : else if (!last_start_times)
1017 : {
1018 : /* Attach to existing dynamic shared hash table. */
1019 196 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1020 196 : dsa_pin_mapping(last_start_times_dsa);
1021 196 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1022 196 : LogicalRepCtx->last_start_dsh, 0);
1023 : }
1024 :
1025 286 : MemoryContextSwitchTo(oldcontext);
1026 286 : LWLockRelease(LogicalRepWorkerLock);
1027 : }
1028 :
1029 : /*
1030 : * Set the last-start time for the subscription.
1031 : */
1032 : static void
1033 248 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1034 : {
1035 : LauncherLastStartTimesEntry *entry;
1036 : bool found;
1037 :
1038 248 : logicalrep_launcher_attach_dshmem();
1039 :
1040 248 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1041 248 : entry->last_start_time = start_time;
1042 248 : dshash_release_lock(last_start_times, entry);
1043 248 : }
1044 :
1045 : /*
1046 : * Return the last-start time for the subscription, or 0 if there isn't one.
1047 : */
1048 : static TimestampTz
1049 296 : ApplyLauncherGetWorkerStartTime(Oid subid)
1050 : {
1051 : LauncherLastStartTimesEntry *entry;
1052 : TimestampTz ret;
1053 :
1054 296 : logicalrep_launcher_attach_dshmem();
1055 :
1056 296 : entry = dshash_find(last_start_times, &subid, false);
1057 296 : if (entry == NULL)
1058 216 : return 0;
1059 :
1060 80 : ret = entry->last_start_time;
1061 80 : dshash_release_lock(last_start_times, entry);
1062 :
1063 80 : return ret;
1064 : }
1065 :
1066 : /*
1067 : * Remove the last-start-time entry for the subscription, if one exists.
1068 : *
1069 : * This has two use-cases: to remove the entry related to a subscription
1070 : * that's been deleted or disabled (just to avoid leaking shared memory),
1071 : * and to allow immediate restart of an apply worker that has exited
1072 : * due to subscription parameter changes.
1073 : */
1074 : void
1075 254 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1076 : {
1077 254 : logicalrep_launcher_attach_dshmem();
1078 :
1079 254 : (void) dshash_delete_key(last_start_times, &subid);
1080 254 : }
1081 :
1082 : /*
1083 : * Wakeup the launcher on commit if requested.
1084 : */
1085 : void
1086 570398 : AtEOXact_ApplyLauncher(bool isCommit)
1087 : {
1088 570398 : if (isCommit)
1089 : {
1090 524506 : if (on_commit_launcher_wakeup)
1091 232 : ApplyLauncherWakeup();
1092 : }
1093 :
1094 570398 : on_commit_launcher_wakeup = false;
1095 570398 : }
1096 :
1097 : /*
1098 : * Request wakeup of the launcher on commit of the transaction.
1099 : *
1100 : * This is used to send launcher signal to stop sleeping and process the
1101 : * subscriptions when current transaction commits. Should be used when new
1102 : * tuple was added to the pg_subscription catalog.
1103 : */
1104 : void
1105 232 : ApplyLauncherWakeupAtCommit(void)
1106 : {
1107 232 : if (!on_commit_launcher_wakeup)
1108 232 : on_commit_launcher_wakeup = true;
1109 232 : }
1110 :
1111 : static void
1112 970 : ApplyLauncherWakeup(void)
1113 : {
1114 970 : if (LogicalRepCtx->launcher_pid != 0)
1115 918 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1116 970 : }
1117 :
1118 : /*
1119 : * Main loop for the apply launcher process.
1120 : */
1121 : void
1122 686 : ApplyLauncherMain(Datum main_arg)
1123 : {
1124 686 : ereport(DEBUG1,
1125 : (errmsg_internal("logical replication launcher started")));
1126 :
1127 686 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1128 :
1129 : Assert(LogicalRepCtx->launcher_pid == 0);
1130 686 : LogicalRepCtx->launcher_pid = MyProcPid;
1131 :
1132 : /* Establish signal handlers. */
1133 686 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1134 686 : pqsignal(SIGTERM, die);
1135 686 : BackgroundWorkerUnblockSignals();
1136 :
1137 : /*
1138 : * Establish connection to nailed catalogs (we only ever access
1139 : * pg_subscription).
1140 : */
1141 686 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1142 :
1143 : /* Enter main loop */
1144 : for (;;)
1145 3504 : {
1146 : int rc;
1147 : List *sublist;
1148 : ListCell *lc;
1149 : MemoryContext subctx;
1150 : MemoryContext oldctx;
1151 4190 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1152 :
1153 4190 : CHECK_FOR_INTERRUPTS();
1154 :
1155 : /* Use temporary context to avoid leaking memory across cycles. */
1156 4186 : subctx = AllocSetContextCreate(TopMemoryContext,
1157 : "Logical Replication Launcher sublist",
1158 : ALLOCSET_DEFAULT_SIZES);
1159 4186 : oldctx = MemoryContextSwitchTo(subctx);
1160 :
1161 : /* Start any missing workers for enabled subscriptions. */
1162 4186 : sublist = get_subscription_list();
1163 4928 : foreach(lc, sublist)
1164 : {
1165 746 : Subscription *sub = (Subscription *) lfirst(lc);
1166 : LogicalRepWorker *w;
1167 : TimestampTz last_start;
1168 : TimestampTz now;
1169 : long elapsed;
1170 :
1171 746 : if (!sub->enabled)
1172 58 : continue;
1173 :
1174 688 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1175 688 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1176 688 : LWLockRelease(LogicalRepWorkerLock);
1177 :
1178 688 : if (w != NULL)
1179 392 : continue; /* worker is running already */
1180 :
1181 : /*
1182 : * If the worker is eligible to start now, launch it. Otherwise,
1183 : * adjust wait_time so that we'll wake up as soon as it can be
1184 : * started.
1185 : *
1186 : * Each subscription's apply worker can only be restarted once per
1187 : * wal_retrieve_retry_interval, so that errors do not cause us to
1188 : * repeatedly restart the worker as fast as possible. In cases
1189 : * where a restart is expected (e.g., subscription parameter
1190 : * changes), another process should remove the last-start entry
1191 : * for the subscription so that the worker can be restarted
1192 : * without waiting for wal_retrieve_retry_interval to elapse.
1193 : */
1194 296 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1195 296 : now = GetCurrentTimestamp();
1196 296 : if (last_start == 0 ||
1197 80 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1198 : {
1199 248 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1200 248 : logicalrep_worker_launch(WORKERTYPE_APPLY,
1201 248 : sub->dbid, sub->oid, sub->name,
1202 : sub->owner, InvalidOid,
1203 : DSM_HANDLE_INVALID);
1204 : }
1205 : else
1206 : {
1207 48 : wait_time = Min(wait_time,
1208 : wal_retrieve_retry_interval - elapsed);
1209 : }
1210 : }
1211 :
1212 : /* Switch back to original memory context. */
1213 4182 : MemoryContextSwitchTo(oldctx);
1214 : /* Clean the temporary memory. */
1215 4182 : MemoryContextDelete(subctx);
1216 :
1217 : /* Wait for more work. */
1218 4182 : rc = WaitLatch(MyLatch,
1219 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1220 : wait_time,
1221 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1222 :
1223 4176 : if (rc & WL_LATCH_SET)
1224 : {
1225 4156 : ResetLatch(MyLatch);
1226 4156 : CHECK_FOR_INTERRUPTS();
1227 : }
1228 :
1229 3504 : if (ConfigReloadPending)
1230 : {
1231 56 : ConfigReloadPending = false;
1232 56 : ProcessConfigFile(PGC_SIGHUP);
1233 : }
1234 : }
1235 :
1236 : /* Not reachable */
1237 : }
1238 :
1239 : /*
1240 : * Is current process the logical replication launcher?
1241 : */
1242 : bool
1243 722 : IsLogicalLauncher(void)
1244 : {
1245 722 : return LogicalRepCtx->launcher_pid == MyProcPid;
1246 : }
1247 :
1248 : /*
1249 : * Return the pid of the leader apply worker if the given pid is the pid of a
1250 : * parallel apply worker, otherwise, return InvalidPid.
1251 : */
1252 : pid_t
1253 3978 : GetLeaderApplyWorkerPid(pid_t pid)
1254 : {
1255 3978 : int leader_pid = InvalidPid;
1256 : int i;
1257 :
1258 3978 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1259 :
1260 19890 : for (i = 0; i < max_logical_replication_workers; i++)
1261 : {
1262 15912 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1263 :
1264 15912 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1265 : {
1266 0 : leader_pid = w->leader_pid;
1267 0 : break;
1268 : }
1269 : }
1270 :
1271 3978 : LWLockRelease(LogicalRepWorkerLock);
1272 :
1273 3978 : return leader_pid;
1274 : }
1275 :
1276 : /*
1277 : * Returns state of the subscriptions.
1278 : */
1279 : Datum
1280 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1281 : {
1282 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1283 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1284 : int i;
1285 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1286 :
1287 2 : InitMaterializedSRF(fcinfo, 0);
1288 :
1289 : /* Make sure we get consistent view of the workers. */
1290 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1291 :
1292 10 : for (i = 0; i < max_logical_replication_workers; i++)
1293 : {
1294 : /* for each row */
1295 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1296 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1297 : int worker_pid;
1298 : LogicalRepWorker worker;
1299 :
1300 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1301 : sizeof(LogicalRepWorker));
1302 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1303 4 : continue;
1304 :
1305 4 : if (OidIsValid(subid) && worker.subid != subid)
1306 0 : continue;
1307 :
1308 4 : worker_pid = worker.proc->pid;
1309 :
1310 4 : values[0] = ObjectIdGetDatum(worker.subid);
1311 4 : if (isTablesyncWorker(&worker))
1312 0 : values[1] = ObjectIdGetDatum(worker.relid);
1313 : else
1314 4 : nulls[1] = true;
1315 4 : values[2] = Int32GetDatum(worker_pid);
1316 :
1317 4 : if (isParallelApplyWorker(&worker))
1318 0 : values[3] = Int32GetDatum(worker.leader_pid);
1319 : else
1320 4 : nulls[3] = true;
1321 :
1322 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1323 2 : nulls[4] = true;
1324 : else
1325 2 : values[4] = LSNGetDatum(worker.last_lsn);
1326 4 : if (worker.last_send_time == 0)
1327 0 : nulls[5] = true;
1328 : else
1329 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1330 4 : if (worker.last_recv_time == 0)
1331 0 : nulls[6] = true;
1332 : else
1333 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1334 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1335 2 : nulls[7] = true;
1336 : else
1337 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1338 4 : if (worker.reply_time == 0)
1339 0 : nulls[8] = true;
1340 : else
1341 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1342 :
1343 4 : switch (worker.type)
1344 : {
1345 4 : case WORKERTYPE_APPLY:
1346 4 : values[9] = CStringGetTextDatum("apply");
1347 4 : break;
1348 0 : case WORKERTYPE_PARALLEL_APPLY:
1349 0 : values[9] = CStringGetTextDatum("parallel apply");
1350 0 : break;
1351 0 : case WORKERTYPE_TABLESYNC:
1352 0 : values[9] = CStringGetTextDatum("table synchronization");
1353 0 : break;
1354 0 : case WORKERTYPE_UNKNOWN:
1355 : /* Should never happen. */
1356 0 : elog(ERROR, "unknown worker type");
1357 : }
1358 :
1359 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1360 : values, nulls);
1361 :
1362 : /*
1363 : * If only a single subscription was requested, and we found it,
1364 : * break.
1365 : */
1366 4 : if (OidIsValid(subid))
1367 0 : break;
1368 : }
1369 :
1370 2 : LWLockRelease(LogicalRepWorkerLock);
1371 :
1372 2 : return (Datum) 0;
1373 : }
|