Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/slot.h"
35 : #include "replication/walreceiver.h"
36 : #include "replication/worker_internal.h"
37 : #include "storage/ipc.h"
38 : #include "storage/proc.h"
39 : #include "storage/procarray.h"
40 : #include "tcop/tcopprot.h"
41 : #include "utils/builtins.h"
42 : #include "utils/memutils.h"
43 : #include "utils/pg_lsn.h"
44 : #include "utils/snapmgr.h"
45 :
46 : /* max sleep time between cycles (3min) */
47 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
48 :
49 : /* GUC variables */
50 : int max_logical_replication_workers = 4;
51 : int max_sync_workers_per_subscription = 2;
52 : int max_parallel_apply_workers_per_subscription = 2;
53 :
54 : LogicalRepWorker *MyLogicalRepWorker = NULL;
55 :
56 : typedef struct LogicalRepCtxStruct
57 : {
58 : /* Supervisor process. */
59 : pid_t launcher_pid;
60 :
61 : /* Hash table holding last start times of subscriptions' apply workers. */
62 : dsa_handle last_start_dsa;
63 : dshash_table_handle last_start_dsh;
64 :
65 : /* Background workers. */
66 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
67 : } LogicalRepCtxStruct;
68 :
69 : static LogicalRepCtxStruct *LogicalRepCtx;
70 :
71 : /* an entry in the last-start-times shared hash table */
72 : typedef struct LauncherLastStartTimesEntry
73 : {
74 : Oid subid; /* OID of logrep subscription (hash key) */
75 : TimestampTz last_start_time; /* last time its apply worker was started */
76 : } LauncherLastStartTimesEntry;
77 :
78 : /* parameters for the last-start-times shared hash table */
79 : static const dshash_parameters dsh_params = {
80 : sizeof(Oid),
81 : sizeof(LauncherLastStartTimesEntry),
82 : dshash_memcmp,
83 : dshash_memhash,
84 : dshash_memcpy,
85 : LWTRANCHE_LAUNCHER_HASH
86 : };
87 :
88 : static dsa_area *last_start_times_dsa = NULL;
89 : static dshash_table *last_start_times = NULL;
90 :
91 : static bool on_commit_launcher_wakeup = false;
92 :
93 :
94 : static void ApplyLauncherWakeup(void);
95 : static void logicalrep_launcher_onexit(int code, Datum arg);
96 : static void logicalrep_worker_onexit(int code, Datum arg);
97 : static void logicalrep_worker_detach(void);
98 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 : static int logicalrep_pa_worker_count(Oid subid);
100 : static void logicalrep_launcher_attach_dshmem(void);
101 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
102 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
103 :
104 :
105 : /*
106 : * Load the list of subscriptions.
107 : *
108 : * Only the fields interesting for worker start/stop functions are filled for
109 : * each subscription.
110 : */
111 : static List *
112 4422 : get_subscription_list(void)
113 : {
114 4422 : List *res = NIL;
115 : Relation rel;
116 : TableScanDesc scan;
117 : HeapTuple tup;
118 : MemoryContext resultcxt;
119 :
120 : /* This is the context that we will allocate our output data in */
121 4422 : resultcxt = CurrentMemoryContext;
122 :
123 : /*
124 : * Start a transaction so we can access pg_database, and get a snapshot.
125 : * We don't have a use for the snapshot itself, but we're interested in
126 : * the secondary effect that it sets RecentGlobalXmin. (This is critical
127 : * for anything that reads heap pages, because HOT may decide to prune
128 : * them even if the process doesn't attempt to modify any tuples.)
129 : *
130 : * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
131 : * not pushed/active does not reliably prevent HOT pruning (->xmin could
132 : * e.g. be cleared when cache invalidations are processed).
133 : */
134 4422 : StartTransactionCommand();
135 4422 : (void) GetTransactionSnapshot();
136 :
137 4422 : rel = table_open(SubscriptionRelationId, AccessShareLock);
138 4422 : scan = table_beginscan_catalog(rel, 0, NULL);
139 :
140 5214 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
141 : {
142 792 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
143 : Subscription *sub;
144 : MemoryContext oldcxt;
145 :
146 : /*
147 : * Allocate our results in the caller's context, not the
148 : * transaction's. We do this inside the loop, and restore the original
149 : * context at the end, so that leaky things like heap_getnext() are
150 : * not called in a potentially long-lived context.
151 : */
152 792 : oldcxt = MemoryContextSwitchTo(resultcxt);
153 :
154 792 : sub = (Subscription *) palloc0(sizeof(Subscription));
155 792 : sub->oid = subform->oid;
156 792 : sub->dbid = subform->subdbid;
157 792 : sub->owner = subform->subowner;
158 792 : sub->enabled = subform->subenabled;
159 792 : sub->name = pstrdup(NameStr(subform->subname));
160 : /* We don't fill fields we are not interested in. */
161 :
162 792 : res = lappend(res, sub);
163 792 : MemoryContextSwitchTo(oldcxt);
164 : }
165 :
166 4422 : table_endscan(scan);
167 4422 : table_close(rel, AccessShareLock);
168 :
169 4422 : CommitTransactionCommand();
170 :
171 4422 : return res;
172 : }
173 :
174 : /*
175 : * Wait for a background worker to start up and attach to the shmem context.
176 : *
177 : * This is only needed for cleaning up the shared memory in case the worker
178 : * fails to attach.
179 : *
180 : * Returns whether the attach was successful.
181 : */
182 : static bool
183 3370 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
184 : uint16 generation,
185 : BackgroundWorkerHandle *handle)
186 : {
187 : BgwHandleStatus status;
188 : int rc;
189 :
190 : for (;;)
191 2740 : {
192 : pid_t pid;
193 :
194 3370 : CHECK_FOR_INTERRUPTS();
195 :
196 3370 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 :
198 : /* Worker either died or has started. Return false if died. */
199 3370 : if (!worker->in_use || worker->proc)
200 : {
201 626 : LWLockRelease(LogicalRepWorkerLock);
202 626 : return worker->in_use;
203 : }
204 :
205 2744 : LWLockRelease(LogicalRepWorkerLock);
206 :
207 : /* Check if worker has died before attaching, and clean up after it. */
208 2744 : status = GetBackgroundWorkerPid(handle, &pid);
209 :
210 2744 : if (status == BGWH_STOPPED)
211 : {
212 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
213 : /* Ensure that this was indeed the worker we waited for. */
214 0 : if (generation == worker->generation)
215 0 : logicalrep_worker_cleanup(worker);
216 0 : LWLockRelease(LogicalRepWorkerLock);
217 0 : return false;
218 : }
219 :
220 : /*
221 : * We need timeout because we generally don't get notified via latch
222 : * about the worker attach. But we don't expect to have to wait long.
223 : */
224 2744 : rc = WaitLatch(MyLatch,
225 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
226 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
227 :
228 2744 : if (rc & WL_LATCH_SET)
229 : {
230 820 : ResetLatch(MyLatch);
231 820 : CHECK_FOR_INTERRUPTS();
232 : }
233 : }
234 : }
235 :
236 : /*
237 : * Walks the workers array and searches for one that matches given
238 : * subscription id and relid.
239 : *
240 : * We are only interested in the leader apply worker or table sync worker.
241 : */
242 : LogicalRepWorker *
243 4198 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
244 : {
245 : int i;
246 4198 : LogicalRepWorker *res = NULL;
247 :
248 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
249 :
250 : /* Search for attached worker for a given subscription id. */
251 12760 : for (i = 0; i < max_logical_replication_workers; i++)
252 : {
253 11190 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
254 :
255 : /* Skip parallel apply workers. */
256 11190 : if (isParallelApplyWorker(w))
257 0 : continue;
258 :
259 11190 : if (w->in_use && w->subid == subid && w->relid == relid &&
260 2628 : (!only_running || w->proc))
261 : {
262 2628 : res = w;
263 2628 : break;
264 : }
265 : }
266 :
267 4198 : return res;
268 : }
269 :
270 : /*
271 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
272 : * the subscription, instead of just one.
273 : */
274 : List *
275 950 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
276 : {
277 : int i;
278 950 : List *res = NIL;
279 :
280 950 : if (acquire_lock)
281 190 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
282 :
283 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
284 :
285 : /* Search for attached worker for a given subscription id. */
286 4938 : for (i = 0; i < max_logical_replication_workers; i++)
287 : {
288 3988 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
289 :
290 3988 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
291 622 : res = lappend(res, w);
292 : }
293 :
294 950 : if (acquire_lock)
295 190 : LWLockRelease(LogicalRepWorkerLock);
296 :
297 950 : return res;
298 : }
299 :
300 : /*
301 : * Start new logical replication background worker, if possible.
302 : *
303 : * Returns true on success, false on failure.
304 : */
305 : bool
306 630 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
307 : Oid dbid, Oid subid, const char *subname, Oid userid,
308 : Oid relid, dsm_handle subworker_dsm)
309 : {
310 : BackgroundWorker bgw;
311 : BackgroundWorkerHandle *bgw_handle;
312 : uint16 generation;
313 : int i;
314 630 : int slot = 0;
315 630 : LogicalRepWorker *worker = NULL;
316 : int nsyncworkers;
317 : int nparallelapplyworkers;
318 : TimestampTz now;
319 630 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
320 630 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
321 :
322 : /*----------
323 : * Sanity checks:
324 : * - must be valid worker type
325 : * - tablesync workers are only ones to have relid
326 : * - parallel apply worker is the only kind of subworker
327 : */
328 : Assert(wtype != WORKERTYPE_UNKNOWN);
329 : Assert(is_tablesync_worker == OidIsValid(relid));
330 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
331 :
332 630 : ereport(DEBUG1,
333 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
334 : subname)));
335 :
336 : /* Report this after the initial starting message for consistency. */
337 630 : if (max_replication_slots == 0)
338 0 : ereport(ERROR,
339 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
340 : errmsg("cannot start logical replication workers when \"max_replication_slots\"=0")));
341 :
342 : /*
343 : * We need to do the modification of the shared memory under lock so that
344 : * we have consistent view.
345 : */
346 630 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
347 :
348 630 : retry:
349 : /* Find unused worker slot. */
350 1130 : for (i = 0; i < max_logical_replication_workers; i++)
351 : {
352 1130 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
353 :
354 1130 : if (!w->in_use)
355 : {
356 630 : worker = w;
357 630 : slot = i;
358 630 : break;
359 : }
360 : }
361 :
362 630 : nsyncworkers = logicalrep_sync_worker_count(subid);
363 :
364 630 : now = GetCurrentTimestamp();
365 :
366 : /*
367 : * If we didn't find a free slot, try to do garbage collection. The
368 : * reason we do this is because if some worker failed to start up and its
369 : * parent has crashed while waiting, the in_use state was never cleared.
370 : */
371 630 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
372 : {
373 0 : bool did_cleanup = false;
374 :
375 0 : for (i = 0; i < max_logical_replication_workers; i++)
376 : {
377 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
378 :
379 : /*
380 : * If the worker was marked in use but didn't manage to attach in
381 : * time, clean it up.
382 : */
383 0 : if (w->in_use && !w->proc &&
384 0 : TimestampDifferenceExceeds(w->launch_time, now,
385 : wal_receiver_timeout))
386 : {
387 0 : elog(WARNING,
388 : "logical replication worker for subscription %u took too long to start; canceled",
389 : w->subid);
390 :
391 0 : logicalrep_worker_cleanup(w);
392 0 : did_cleanup = true;
393 : }
394 : }
395 :
396 0 : if (did_cleanup)
397 0 : goto retry;
398 : }
399 :
400 : /*
401 : * We don't allow to invoke more sync workers once we have reached the
402 : * sync worker limit per subscription. So, just return silently as we
403 : * might get here because of an otherwise harmless race condition.
404 : */
405 630 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
406 : {
407 0 : LWLockRelease(LogicalRepWorkerLock);
408 0 : return false;
409 : }
410 :
411 630 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
412 :
413 : /*
414 : * Return false if the number of parallel apply workers reached the limit
415 : * per subscription.
416 : */
417 630 : if (is_parallel_apply_worker &&
418 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
419 : {
420 0 : LWLockRelease(LogicalRepWorkerLock);
421 0 : return false;
422 : }
423 :
424 : /*
425 : * However if there are no more free worker slots, inform user about it
426 : * before exiting.
427 : */
428 630 : if (worker == NULL)
429 : {
430 0 : LWLockRelease(LogicalRepWorkerLock);
431 0 : ereport(WARNING,
432 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
433 : errmsg("out of logical replication worker slots"),
434 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
435 0 : return false;
436 : }
437 :
438 : /* Prepare the worker slot. */
439 630 : worker->type = wtype;
440 630 : worker->launch_time = now;
441 630 : worker->in_use = true;
442 630 : worker->generation++;
443 630 : worker->proc = NULL;
444 630 : worker->dbid = dbid;
445 630 : worker->userid = userid;
446 630 : worker->subid = subid;
447 630 : worker->relid = relid;
448 630 : worker->relstate = SUBREL_STATE_UNKNOWN;
449 630 : worker->relstate_lsn = InvalidXLogRecPtr;
450 630 : worker->stream_fileset = NULL;
451 630 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
452 630 : worker->parallel_apply = is_parallel_apply_worker;
453 630 : worker->last_lsn = InvalidXLogRecPtr;
454 630 : TIMESTAMP_NOBEGIN(worker->last_send_time);
455 630 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
456 630 : worker->reply_lsn = InvalidXLogRecPtr;
457 630 : TIMESTAMP_NOBEGIN(worker->reply_time);
458 :
459 : /* Before releasing lock, remember generation for future identification. */
460 630 : generation = worker->generation;
461 :
462 630 : LWLockRelease(LogicalRepWorkerLock);
463 :
464 : /* Register the new dynamic worker. */
465 630 : memset(&bgw, 0, sizeof(bgw));
466 630 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
467 : BGWORKER_BACKEND_DATABASE_CONNECTION;
468 630 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
469 630 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
470 :
471 630 : switch (worker->type)
472 : {
473 268 : case WORKERTYPE_APPLY:
474 268 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
475 268 : snprintf(bgw.bgw_name, BGW_MAXLEN,
476 : "logical replication apply worker for subscription %u",
477 : subid);
478 268 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
479 268 : break;
480 :
481 20 : case WORKERTYPE_PARALLEL_APPLY:
482 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
483 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
484 : "logical replication parallel apply worker for subscription %u",
485 : subid);
486 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
487 :
488 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
489 20 : break;
490 :
491 342 : case WORKERTYPE_TABLESYNC:
492 342 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
493 342 : snprintf(bgw.bgw_name, BGW_MAXLEN,
494 : "logical replication tablesync worker for subscription %u sync %u",
495 : subid,
496 : relid);
497 342 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
498 342 : break;
499 :
500 0 : case WORKERTYPE_UNKNOWN:
501 : /* Should never happen. */
502 0 : elog(ERROR, "unknown worker type");
503 : }
504 :
505 630 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
506 630 : bgw.bgw_notify_pid = MyProcPid;
507 630 : bgw.bgw_main_arg = Int32GetDatum(slot);
508 :
509 630 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
510 : {
511 : /* Failed to start worker, so clean up the worker slot. */
512 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
513 : Assert(generation == worker->generation);
514 0 : logicalrep_worker_cleanup(worker);
515 0 : LWLockRelease(LogicalRepWorkerLock);
516 :
517 0 : ereport(WARNING,
518 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
519 : errmsg("out of background worker slots"),
520 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
521 0 : return false;
522 : }
523 :
524 : /* Now wait until it attaches. */
525 630 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
526 : }
527 :
528 : /*
529 : * Internal function to stop the worker and wait until it detaches from the
530 : * slot.
531 : */
532 : static void
533 134 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
534 : {
535 : uint16 generation;
536 :
537 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
538 :
539 : /*
540 : * Remember which generation was our worker so we can check if what we see
541 : * is still the same one.
542 : */
543 134 : generation = worker->generation;
544 :
545 : /*
546 : * If we found a worker but it does not have proc set then it is still
547 : * starting up; wait for it to finish starting and then kill it.
548 : */
549 164 : while (worker->in_use && !worker->proc)
550 : {
551 : int rc;
552 :
553 40 : LWLockRelease(LogicalRepWorkerLock);
554 :
555 : /* Wait a bit --- we don't expect to have to wait long. */
556 40 : rc = WaitLatch(MyLatch,
557 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
558 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
559 :
560 40 : if (rc & WL_LATCH_SET)
561 : {
562 0 : ResetLatch(MyLatch);
563 0 : CHECK_FOR_INTERRUPTS();
564 : }
565 :
566 : /* Recheck worker status. */
567 40 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
568 :
569 : /*
570 : * Check whether the worker slot is no longer used, which would mean
571 : * that the worker has exited, or whether the worker generation is
572 : * different, meaning that a different worker has taken the slot.
573 : */
574 40 : if (!worker->in_use || worker->generation != generation)
575 0 : return;
576 :
577 : /* Worker has assigned proc, so it has started. */
578 40 : if (worker->proc)
579 10 : break;
580 : }
581 :
582 : /* Now terminate the worker ... */
583 134 : kill(worker->proc->pid, signo);
584 :
585 : /* ... and wait for it to die. */
586 : for (;;)
587 174 : {
588 : int rc;
589 :
590 : /* is it gone? */
591 308 : if (!worker->proc || worker->generation != generation)
592 : break;
593 :
594 174 : LWLockRelease(LogicalRepWorkerLock);
595 :
596 : /* Wait a bit --- we don't expect to have to wait long. */
597 174 : rc = WaitLatch(MyLatch,
598 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
599 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
600 :
601 174 : if (rc & WL_LATCH_SET)
602 : {
603 42 : ResetLatch(MyLatch);
604 42 : CHECK_FOR_INTERRUPTS();
605 : }
606 :
607 174 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
608 : }
609 : }
610 :
611 : /*
612 : * Stop the logical replication worker for subid/relid, if any.
613 : */
614 : void
615 154 : logicalrep_worker_stop(Oid subid, Oid relid)
616 : {
617 : LogicalRepWorker *worker;
618 :
619 154 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
620 :
621 154 : worker = logicalrep_worker_find(subid, relid, false);
622 :
623 154 : if (worker)
624 : {
625 : Assert(!isParallelApplyWorker(worker));
626 118 : logicalrep_worker_stop_internal(worker, SIGTERM);
627 : }
628 :
629 154 : LWLockRelease(LogicalRepWorkerLock);
630 154 : }
631 :
632 : /*
633 : * Stop the given logical replication parallel apply worker.
634 : *
635 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
636 : * worker so that the worker exits cleanly.
637 : */
638 : void
639 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
640 : {
641 : int slot_no;
642 : uint16 generation;
643 : LogicalRepWorker *worker;
644 :
645 10 : SpinLockAcquire(&winfo->shared->mutex);
646 10 : generation = winfo->shared->logicalrep_worker_generation;
647 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
648 10 : SpinLockRelease(&winfo->shared->mutex);
649 :
650 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
651 :
652 : /*
653 : * Detach from the error_mq_handle for the parallel apply worker before
654 : * stopping it. This prevents the leader apply worker from trying to
655 : * receive the message from the error queue that might already be detached
656 : * by the parallel apply worker.
657 : */
658 10 : if (winfo->error_mq_handle)
659 : {
660 10 : shm_mq_detach(winfo->error_mq_handle);
661 10 : winfo->error_mq_handle = NULL;
662 : }
663 :
664 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
665 :
666 10 : worker = &LogicalRepCtx->workers[slot_no];
667 : Assert(isParallelApplyWorker(worker));
668 :
669 : /*
670 : * Only stop the worker if the generation matches and the worker is alive.
671 : */
672 10 : if (worker->generation == generation && worker->proc)
673 10 : logicalrep_worker_stop_internal(worker, SIGINT);
674 :
675 10 : LWLockRelease(LogicalRepWorkerLock);
676 10 : }
677 :
678 : /*
679 : * Wake up (using latch) any logical replication worker for specified sub/rel.
680 : */
681 : void
682 382 : logicalrep_worker_wakeup(Oid subid, Oid relid)
683 : {
684 : LogicalRepWorker *worker;
685 :
686 382 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
687 :
688 382 : worker = logicalrep_worker_find(subid, relid, true);
689 :
690 382 : if (worker)
691 382 : logicalrep_worker_wakeup_ptr(worker);
692 :
693 382 : LWLockRelease(LogicalRepWorkerLock);
694 382 : }
695 :
696 : /*
697 : * Wake up (using latch) the specified logical replication worker.
698 : *
699 : * Caller must hold lock, else worker->proc could change under us.
700 : */
701 : void
702 1136 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
703 : {
704 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
705 :
706 1136 : SetLatch(&worker->proc->procLatch);
707 1136 : }
708 :
709 : /*
710 : * Attach to a slot.
711 : */
712 : void
713 764 : logicalrep_worker_attach(int slot)
714 : {
715 : /* Block concurrent access. */
716 764 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
717 :
718 : Assert(slot >= 0 && slot < max_logical_replication_workers);
719 764 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
720 :
721 764 : if (!MyLogicalRepWorker->in_use)
722 : {
723 0 : LWLockRelease(LogicalRepWorkerLock);
724 0 : ereport(ERROR,
725 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
726 : errmsg("logical replication worker slot %d is empty, cannot attach",
727 : slot)));
728 : }
729 :
730 764 : if (MyLogicalRepWorker->proc)
731 : {
732 0 : LWLockRelease(LogicalRepWorkerLock);
733 0 : ereport(ERROR,
734 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
735 : errmsg("logical replication worker slot %d is already used by "
736 : "another worker, cannot attach", slot)));
737 : }
738 :
739 764 : MyLogicalRepWorker->proc = MyProc;
740 764 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
741 :
742 764 : LWLockRelease(LogicalRepWorkerLock);
743 764 : }
744 :
745 : /*
746 : * Stop the parallel apply workers if any, and detach the leader apply worker
747 : * (cleans up the worker info).
748 : */
749 : static void
750 764 : logicalrep_worker_detach(void)
751 : {
752 : /* Stop the parallel apply workers. */
753 764 : if (am_leader_apply_worker())
754 : {
755 : List *workers;
756 : ListCell *lc;
757 :
758 : /*
759 : * Detach from the error_mq_handle for all parallel apply workers
760 : * before terminating them. This prevents the leader apply worker from
761 : * receiving the worker termination message and sending it to logs
762 : * when the same is already done by the parallel worker.
763 : */
764 398 : pa_detach_all_error_mq();
765 :
766 398 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
767 :
768 398 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
769 802 : foreach(lc, workers)
770 : {
771 404 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
772 :
773 404 : if (isParallelApplyWorker(w))
774 6 : logicalrep_worker_stop_internal(w, SIGTERM);
775 : }
776 :
777 398 : LWLockRelease(LogicalRepWorkerLock);
778 : }
779 :
780 : /* Block concurrent access. */
781 764 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
782 :
783 764 : logicalrep_worker_cleanup(MyLogicalRepWorker);
784 :
785 764 : LWLockRelease(LogicalRepWorkerLock);
786 764 : }
787 :
788 : /*
789 : * Clean up worker info.
790 : */
791 : static void
792 764 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
793 : {
794 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
795 :
796 764 : worker->type = WORKERTYPE_UNKNOWN;
797 764 : worker->in_use = false;
798 764 : worker->proc = NULL;
799 764 : worker->dbid = InvalidOid;
800 764 : worker->userid = InvalidOid;
801 764 : worker->subid = InvalidOid;
802 764 : worker->relid = InvalidOid;
803 764 : worker->leader_pid = InvalidPid;
804 764 : worker->parallel_apply = false;
805 764 : }
806 :
807 : /*
808 : * Cleanup function for logical replication launcher.
809 : *
810 : * Called on logical replication launcher exit.
811 : */
812 : static void
813 740 : logicalrep_launcher_onexit(int code, Datum arg)
814 : {
815 740 : LogicalRepCtx->launcher_pid = 0;
816 740 : }
817 :
818 : /*
819 : * Cleanup function.
820 : *
821 : * Called on logical replication worker exit.
822 : */
823 : static void
824 764 : logicalrep_worker_onexit(int code, Datum arg)
825 : {
826 : /* Disconnect gracefully from the remote side. */
827 764 : if (LogRepWorkerWalRcvConn)
828 686 : walrcv_disconnect(LogRepWorkerWalRcvConn);
829 :
830 764 : logicalrep_worker_detach();
831 :
832 : /* Cleanup fileset used for streaming transactions. */
833 764 : if (MyLogicalRepWorker->stream_fileset != NULL)
834 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
835 :
836 : /*
837 : * Session level locks may be acquired outside of a transaction in
838 : * parallel apply mode and will not be released when the worker
839 : * terminates, so manually release all locks before the worker exits.
840 : *
841 : * The locks will be acquired once the worker is initialized.
842 : */
843 764 : if (!InitializingApplyWorker)
844 754 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
845 :
846 764 : ApplyLauncherWakeup();
847 764 : }
848 :
849 : /*
850 : * Count the number of registered (not necessarily running) sync workers
851 : * for a subscription.
852 : */
853 : int
854 1852 : logicalrep_sync_worker_count(Oid subid)
855 : {
856 : int i;
857 1852 : int res = 0;
858 :
859 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
860 :
861 : /* Search for attached worker for a given subscription id. */
862 9596 : for (i = 0; i < max_logical_replication_workers; i++)
863 : {
864 7744 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
865 :
866 7744 : if (isTablesyncWorker(w) && w->subid == subid)
867 1968 : res++;
868 : }
869 :
870 1852 : return res;
871 : }
872 :
873 : /*
874 : * Count the number of registered (but not necessarily running) parallel apply
875 : * workers for a subscription.
876 : */
877 : static int
878 630 : logicalrep_pa_worker_count(Oid subid)
879 : {
880 : int i;
881 630 : int res = 0;
882 :
883 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
884 :
885 : /*
886 : * Scan all attached parallel apply workers, only counting those which
887 : * have the given subscription id.
888 : */
889 3338 : for (i = 0; i < max_logical_replication_workers; i++)
890 : {
891 2708 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
892 :
893 2708 : if (isParallelApplyWorker(w) && w->subid == subid)
894 4 : res++;
895 : }
896 :
897 630 : return res;
898 : }
899 :
900 : /*
901 : * ApplyLauncherShmemSize
902 : * Compute space needed for replication launcher shared memory
903 : */
904 : Size
905 7338 : ApplyLauncherShmemSize(void)
906 : {
907 : Size size;
908 :
909 : /*
910 : * Need the fixed struct and the array of LogicalRepWorker.
911 : */
912 7338 : size = sizeof(LogicalRepCtxStruct);
913 7338 : size = MAXALIGN(size);
914 7338 : size = add_size(size, mul_size(max_logical_replication_workers,
915 : sizeof(LogicalRepWorker)));
916 7338 : return size;
917 : }
918 :
919 : /*
920 : * ApplyLauncherRegister
921 : * Register a background worker running the logical replication launcher.
922 : */
923 : void
924 1528 : ApplyLauncherRegister(void)
925 : {
926 : BackgroundWorker bgw;
927 :
928 : /*
929 : * The logical replication launcher is disabled during binary upgrades, to
930 : * prevent logical replication workers from running on the source cluster.
931 : * That could cause replication origins to move forward after having been
932 : * copied to the target cluster, potentially creating conflicts with the
933 : * copied data files.
934 : */
935 1528 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
936 54 : return;
937 :
938 1474 : memset(&bgw, 0, sizeof(bgw));
939 1474 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
940 : BGWORKER_BACKEND_DATABASE_CONNECTION;
941 1474 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
942 1474 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
943 1474 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
944 1474 : snprintf(bgw.bgw_name, BGW_MAXLEN,
945 : "logical replication launcher");
946 1474 : snprintf(bgw.bgw_type, BGW_MAXLEN,
947 : "logical replication launcher");
948 1474 : bgw.bgw_restart_time = 5;
949 1474 : bgw.bgw_notify_pid = 0;
950 1474 : bgw.bgw_main_arg = (Datum) 0;
951 :
952 1474 : RegisterBackgroundWorker(&bgw);
953 : }
954 :
955 : /*
956 : * ApplyLauncherShmemInit
957 : * Allocate and initialize replication launcher shared memory
958 : */
959 : void
960 1902 : ApplyLauncherShmemInit(void)
961 : {
962 : bool found;
963 :
964 1902 : LogicalRepCtx = (LogicalRepCtxStruct *)
965 1902 : ShmemInitStruct("Logical Replication Launcher Data",
966 : ApplyLauncherShmemSize(),
967 : &found);
968 :
969 1902 : if (!found)
970 : {
971 : int slot;
972 :
973 1902 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
974 :
975 1902 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
976 1902 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
977 :
978 : /* Initialize memory and spin locks for each worker slot. */
979 9452 : for (slot = 0; slot < max_logical_replication_workers; slot++)
980 : {
981 7550 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
982 :
983 7550 : memset(worker, 0, sizeof(LogicalRepWorker));
984 7550 : SpinLockInit(&worker->relmutex);
985 : }
986 : }
987 1902 : }
988 :
989 : /*
990 : * Initialize or attach to the dynamic shared hash table that stores the
991 : * last-start times, if not already done.
992 : * This must be called before accessing the table.
993 : */
994 : static void
995 852 : logicalrep_launcher_attach_dshmem(void)
996 : {
997 : MemoryContext oldcontext;
998 :
999 : /* Quick exit if we already did this. */
1000 852 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1001 754 : last_start_times != NULL)
1002 542 : return;
1003 :
1004 : /* Otherwise, use a lock to ensure only one process creates the table. */
1005 310 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1006 :
1007 : /* Be sure any local memory allocated by DSA routines is persistent. */
1008 310 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1009 :
1010 310 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1011 : {
1012 : /* Initialize dynamic shared hash table for last-start times. */
1013 98 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1014 98 : dsa_pin(last_start_times_dsa);
1015 98 : dsa_pin_mapping(last_start_times_dsa);
1016 98 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1017 :
1018 : /* Store handles in shared memory for other backends to use. */
1019 98 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1020 98 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1021 : }
1022 212 : else if (!last_start_times)
1023 : {
1024 : /* Attach to existing dynamic shared hash table. */
1025 212 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1026 212 : dsa_pin_mapping(last_start_times_dsa);
1027 212 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1028 212 : LogicalRepCtx->last_start_dsh, 0);
1029 : }
1030 :
1031 310 : MemoryContextSwitchTo(oldcontext);
1032 310 : LWLockRelease(LogicalRepWorkerLock);
1033 : }
1034 :
1035 : /*
1036 : * Set the last-start time for the subscription.
1037 : */
1038 : static void
1039 268 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1040 : {
1041 : LauncherLastStartTimesEntry *entry;
1042 : bool found;
1043 :
1044 268 : logicalrep_launcher_attach_dshmem();
1045 :
1046 268 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1047 268 : entry->last_start_time = start_time;
1048 268 : dshash_release_lock(last_start_times, entry);
1049 268 : }
1050 :
1051 : /*
1052 : * Return the last-start time for the subscription, or 0 if there isn't one.
1053 : */
1054 : static TimestampTz
1055 312 : ApplyLauncherGetWorkerStartTime(Oid subid)
1056 : {
1057 : LauncherLastStartTimesEntry *entry;
1058 : TimestampTz ret;
1059 :
1060 312 : logicalrep_launcher_attach_dshmem();
1061 :
1062 312 : entry = dshash_find(last_start_times, &subid, false);
1063 312 : if (entry == NULL)
1064 230 : return 0;
1065 :
1066 82 : ret = entry->last_start_time;
1067 82 : dshash_release_lock(last_start_times, entry);
1068 :
1069 82 : return ret;
1070 : }
1071 :
1072 : /*
1073 : * Remove the last-start-time entry for the subscription, if one exists.
1074 : *
1075 : * This has two use-cases: to remove the entry related to a subscription
1076 : * that's been deleted or disabled (just to avoid leaking shared memory),
1077 : * and to allow immediate restart of an apply worker that has exited
1078 : * due to subscription parameter changes.
1079 : */
1080 : void
1081 272 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1082 : {
1083 272 : logicalrep_launcher_attach_dshmem();
1084 :
1085 272 : (void) dshash_delete_key(last_start_times, &subid);
1086 272 : }
1087 :
1088 : /*
1089 : * Wakeup the launcher on commit if requested.
1090 : */
1091 : void
1092 748464 : AtEOXact_ApplyLauncher(bool isCommit)
1093 : {
1094 748464 : if (isCommit)
1095 : {
1096 701178 : if (on_commit_launcher_wakeup)
1097 244 : ApplyLauncherWakeup();
1098 : }
1099 :
1100 748464 : on_commit_launcher_wakeup = false;
1101 748464 : }
1102 :
1103 : /*
1104 : * Request wakeup of the launcher on commit of the transaction.
1105 : *
1106 : * This is used to send launcher signal to stop sleeping and process the
1107 : * subscriptions when current transaction commits. Should be used when new
1108 : * tuple was added to the pg_subscription catalog.
1109 : */
1110 : void
1111 244 : ApplyLauncherWakeupAtCommit(void)
1112 : {
1113 244 : if (!on_commit_launcher_wakeup)
1114 244 : on_commit_launcher_wakeup = true;
1115 244 : }
1116 :
1117 : static void
1118 1008 : ApplyLauncherWakeup(void)
1119 : {
1120 1008 : if (LogicalRepCtx->launcher_pid != 0)
1121 950 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1122 1008 : }
1123 :
1124 : /*
1125 : * Main loop for the apply launcher process.
1126 : */
1127 : void
1128 740 : ApplyLauncherMain(Datum main_arg)
1129 : {
1130 740 : ereport(DEBUG1,
1131 : (errmsg_internal("logical replication launcher started")));
1132 :
1133 740 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1134 :
1135 : Assert(LogicalRepCtx->launcher_pid == 0);
1136 740 : LogicalRepCtx->launcher_pid = MyProcPid;
1137 :
1138 : /* Establish signal handlers. */
1139 740 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1140 740 : pqsignal(SIGTERM, die);
1141 740 : BackgroundWorkerUnblockSignals();
1142 :
1143 : /*
1144 : * Establish connection to nailed catalogs (we only ever access
1145 : * pg_subscription).
1146 : */
1147 740 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1148 :
1149 : /* Enter main loop */
1150 : for (;;)
1151 3714 : {
1152 : int rc;
1153 : List *sublist;
1154 : ListCell *lc;
1155 : MemoryContext subctx;
1156 : MemoryContext oldctx;
1157 4454 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1158 :
1159 4454 : CHECK_FOR_INTERRUPTS();
1160 :
1161 : /* Use temporary context to avoid leaking memory across cycles. */
1162 4422 : subctx = AllocSetContextCreate(TopMemoryContext,
1163 : "Logical Replication Launcher sublist",
1164 : ALLOCSET_DEFAULT_SIZES);
1165 4422 : oldctx = MemoryContextSwitchTo(subctx);
1166 :
1167 : /* Start any missing workers for enabled subscriptions. */
1168 4422 : sublist = get_subscription_list();
1169 5212 : foreach(lc, sublist)
1170 : {
1171 792 : Subscription *sub = (Subscription *) lfirst(lc);
1172 : LogicalRepWorker *w;
1173 : TimestampTz last_start;
1174 : TimestampTz now;
1175 : long elapsed;
1176 :
1177 792 : if (!sub->enabled)
1178 66 : continue;
1179 :
1180 726 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1181 726 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1182 726 : LWLockRelease(LogicalRepWorkerLock);
1183 :
1184 726 : if (w != NULL)
1185 414 : continue; /* worker is running already */
1186 :
1187 : /*
1188 : * If the worker is eligible to start now, launch it. Otherwise,
1189 : * adjust wait_time so that we'll wake up as soon as it can be
1190 : * started.
1191 : *
1192 : * Each subscription's apply worker can only be restarted once per
1193 : * wal_retrieve_retry_interval, so that errors do not cause us to
1194 : * repeatedly restart the worker as fast as possible. In cases
1195 : * where a restart is expected (e.g., subscription parameter
1196 : * changes), another process should remove the last-start entry
1197 : * for the subscription so that the worker can be restarted
1198 : * without waiting for wal_retrieve_retry_interval to elapse.
1199 : */
1200 312 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1201 312 : now = GetCurrentTimestamp();
1202 312 : if (last_start == 0 ||
1203 82 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1204 : {
1205 268 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1206 268 : logicalrep_worker_launch(WORKERTYPE_APPLY,
1207 268 : sub->dbid, sub->oid, sub->name,
1208 : sub->owner, InvalidOid,
1209 : DSM_HANDLE_INVALID);
1210 : }
1211 : else
1212 : {
1213 44 : wait_time = Min(wait_time,
1214 : wal_retrieve_retry_interval - elapsed);
1215 : }
1216 : }
1217 :
1218 : /* Switch back to original memory context. */
1219 4420 : MemoryContextSwitchTo(oldctx);
1220 : /* Clean the temporary memory. */
1221 4420 : MemoryContextDelete(subctx);
1222 :
1223 : /* Wait for more work. */
1224 4420 : rc = WaitLatch(MyLatch,
1225 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1226 : wait_time,
1227 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1228 :
1229 4414 : if (rc & WL_LATCH_SET)
1230 : {
1231 4400 : ResetLatch(MyLatch);
1232 4400 : CHECK_FOR_INTERRUPTS();
1233 : }
1234 :
1235 3714 : if (ConfigReloadPending)
1236 : {
1237 58 : ConfigReloadPending = false;
1238 58 : ProcessConfigFile(PGC_SIGHUP);
1239 : }
1240 : }
1241 :
1242 : /* Not reachable */
1243 : }
1244 :
1245 : /*
1246 : * Is current process the logical replication launcher?
1247 : */
1248 : bool
1249 776 : IsLogicalLauncher(void)
1250 : {
1251 776 : return LogicalRepCtx->launcher_pid == MyProcPid;
1252 : }
1253 :
1254 : /*
1255 : * Return the pid of the leader apply worker if the given pid is the pid of a
1256 : * parallel apply worker, otherwise, return InvalidPid.
1257 : */
1258 : pid_t
1259 1504 : GetLeaderApplyWorkerPid(pid_t pid)
1260 : {
1261 1504 : int leader_pid = InvalidPid;
1262 : int i;
1263 :
1264 1504 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1265 :
1266 7520 : for (i = 0; i < max_logical_replication_workers; i++)
1267 : {
1268 6016 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1269 :
1270 6016 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1271 : {
1272 0 : leader_pid = w->leader_pid;
1273 0 : break;
1274 : }
1275 : }
1276 :
1277 1504 : LWLockRelease(LogicalRepWorkerLock);
1278 :
1279 1504 : return leader_pid;
1280 : }
1281 :
1282 : /*
1283 : * Returns state of the subscriptions.
1284 : */
1285 : Datum
1286 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1287 : {
1288 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1289 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1290 : int i;
1291 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1292 :
1293 2 : InitMaterializedSRF(fcinfo, 0);
1294 :
1295 : /* Make sure we get consistent view of the workers. */
1296 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1297 :
1298 10 : for (i = 0; i < max_logical_replication_workers; i++)
1299 : {
1300 : /* for each row */
1301 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1302 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1303 : int worker_pid;
1304 : LogicalRepWorker worker;
1305 :
1306 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1307 : sizeof(LogicalRepWorker));
1308 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1309 4 : continue;
1310 :
1311 4 : if (OidIsValid(subid) && worker.subid != subid)
1312 0 : continue;
1313 :
1314 4 : worker_pid = worker.proc->pid;
1315 :
1316 4 : values[0] = ObjectIdGetDatum(worker.subid);
1317 4 : if (isTablesyncWorker(&worker))
1318 0 : values[1] = ObjectIdGetDatum(worker.relid);
1319 : else
1320 4 : nulls[1] = true;
1321 4 : values[2] = Int32GetDatum(worker_pid);
1322 :
1323 4 : if (isParallelApplyWorker(&worker))
1324 0 : values[3] = Int32GetDatum(worker.leader_pid);
1325 : else
1326 4 : nulls[3] = true;
1327 :
1328 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1329 2 : nulls[4] = true;
1330 : else
1331 2 : values[4] = LSNGetDatum(worker.last_lsn);
1332 4 : if (worker.last_send_time == 0)
1333 0 : nulls[5] = true;
1334 : else
1335 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1336 4 : if (worker.last_recv_time == 0)
1337 0 : nulls[6] = true;
1338 : else
1339 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1340 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1341 2 : nulls[7] = true;
1342 : else
1343 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1344 4 : if (worker.reply_time == 0)
1345 0 : nulls[8] = true;
1346 : else
1347 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1348 :
1349 4 : switch (worker.type)
1350 : {
1351 4 : case WORKERTYPE_APPLY:
1352 4 : values[9] = CStringGetTextDatum("apply");
1353 4 : break;
1354 0 : case WORKERTYPE_PARALLEL_APPLY:
1355 0 : values[9] = CStringGetTextDatum("parallel apply");
1356 0 : break;
1357 0 : case WORKERTYPE_TABLESYNC:
1358 0 : values[9] = CStringGetTextDatum("table synchronization");
1359 0 : break;
1360 0 : case WORKERTYPE_UNKNOWN:
1361 : /* Should never happen. */
1362 0 : elog(ERROR, "unknown worker type");
1363 : }
1364 :
1365 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1366 : values, nulls);
1367 :
1368 : /*
1369 : * If only a single subscription was requested, and we found it,
1370 : * break.
1371 : */
1372 4 : if (OidIsValid(subid))
1373 0 : break;
1374 : }
1375 :
1376 2 : LWLockRelease(LogicalRepWorkerLock);
1377 :
1378 2 : return (Datum) 0;
1379 : }
|